diff options
Diffstat (limited to 'webmap-import')
-rwxr-xr-x | webmap-import | 1658 |
1 files changed, 557 insertions, 1101 deletions
diff --git a/webmap-import b/webmap-import index c1d07a3..e5a1426 100755 --- a/webmap-import +++ b/webmap-import @@ -2,7 +2,7 @@ #---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (extract/import layers) -# Copyright © 2024 Guilhem Moulin <info@guilhem.se> +# Copyright © 2024-2025 Guilhem Moulin <info@guilhem.se> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -18,319 +18,67 @@ # along with this program. If not, see <https://www.gnu.org/licenses/>. #---------------------------------------------------------------------- +# pylint: disable=invalid-name, missing-module-docstring, fixme + +from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY import os +from stat import S_ISREG +import sys +from fcntl import flock, LOCK_EX, LOCK_SH import logging import argparse -import tempfile import re -import math -from fnmatch import fnmatchcase +from datetime import datetime, timedelta, timezone, UTC +from math import modf from pathlib import Path +from time import monotonic as time_monotonic +from typing import Any, Optional, NoReturn +import traceback from osgeo import gdal, ogr, osr from osgeo.gdalconst import ( - OF_VECTOR as GDAL_OF_VECTOR, - OF_ALL as GDAL_OF_ALL, - OF_READONLY as GDAL_OF_READONLY, - OF_UPDATE as GDAL_OF_UPDATE, - OF_VERBOSE_ERROR as GDAL_OF_VERBOSE_ERROR, CE_None as GDAL_CE_None, - DCAP_CREATE as GDAL_DCAP_CREATE, - DCAP_VECTOR as GDAL_DCAP_VECTOR, DCAP_DEFAULT_FIELDS as GDAL_DCAP_DEFAULT_FIELDS, DCAP_NOTNULL_FIELDS as GDAL_DCAP_NOTNULL_FIELDS, DCAP_UNIQUE_FIELDS as GDAL_DCAP_UNIQUE_FIELDS, ) -import osgeo.gdalconst as gdalconst -gdal.UseExceptions() +from osgeo import gdalconst import common - -# Wrapper around gdal.MajorObject.GetMetadataItem(name) -def getMetadataItem(o, k): - v = o.GetMetadataItem(k) - if v is not None and isinstance(v, str): - return v.upper() == 'YES' - else: - return False - -# Return kwargs and driver for OpenEx() -def setOpenExArgs(option_dict, flags=0): - kwargs = { 'nOpenFlags': GDAL_OF_VECTOR | flags } - - fmt = option_dict.get('format', None) - if fmt is None: - drv = None - else: - drv = gdal.GetDriverByName(fmt) - if drv is None: - raise Exception(f'Unknown driver name "{fmt}"') - elif not getMetadataItem(drv, GDAL_DCAP_VECTOR): - raise Exception(f'Driver "{drv.ShortName}" has no vector capabilities') - kwargs['allowed_drivers'] = [ drv.ShortName ] - - oo = option_dict.get('open-options', None) - if oo is not None: - kwargs['open_options'] = [ k + '=' + str(v) for k, v in oo.items() ] - return kwargs, drv - -# Open and return the output DS. It is created if create=False or -# create-options is a non-empty dictionary. -def openOutputDS(def_dict): - path = def_dict['path'] - kwargs, drv = setOpenExArgs(def_dict, flags=GDAL_OF_UPDATE|GDAL_OF_VERBOSE_ERROR) - try: - logging.debug('OpenEx(%s, %s)', path, str(kwargs)) - return gdal.OpenEx(path, **kwargs) - except RuntimeError as e: - if not (gdal.GetLastErrorType() >= gdalconst.CE_Failure and - gdal.GetLastErrorNo() == gdalconst.CPLE_OpenFailed): - # not an open error - raise e - - dso2 = None - try: - dso2 = gdal.OpenEx(path, nOpenFlags=GDAL_OF_ALL | GDAL_OF_UPDATE) - except Exception: - pass - if dso2 is not None: - # path exists but can't be open with OpenEx(path, **kwargs) - raise e - - try: - dso2 = gdal.OpenEx(path, nOpenFlags=GDAL_OF_ALL) - except Exception: - pass - if dso2 is not None: - # path exists but can't be open with OpenEx(path, **kwargs) - raise e - - dsco = def_dict.get('create-options', None) - if not def_dict.get('create', False) and dsco is None: - # not configured for creation - raise e - if drv is None or not getMetadataItem(drv, GDAL_DCAP_CREATE): - # not capable of creation - raise e - - if 'open_options' in kwargs: - # like ogr2ogr(1) - logging.warning('Destination\'s open options ignored ' + - 'when creating the output datasource') - - kwargs2 = { 'eType': gdal.GDT_Unknown } - if dsco is not None: - kwargs2['options'] = [ k + '=' + str(v) for k, v in dsco.items() ] - - logging.debug('Create(%s, %s, eType=%s%s)', drv.ShortName, path, kwargs2['eType'], - ', options=' + str(kwargs2['options']) if 'options' in kwargs2 else '') - # XXX racy, a GDAL equivalent of O_EXCL would be nice - return drv.Create(path, 0, 0, 0, **kwargs2) - -# cf. ogr/ogrgeometry.cpp:OGRFromOGCGeomType() -def fromGeomTypeName(name): - name = name.upper() - - isMeasured = False - if name.endswith('M'): - isMeasured = True - name = name.removesuffix('M') - - convertTo3D = False - if name.endswith('Z'): - convertTo3D = True - name = name.removesuffix('Z') - - if name == 'POINT': - eGType = ogr.wkbPoint - elif name == 'LINESTRING': - eGType = ogr.wkbLineString - elif name == 'POLYGON': - eGType = ogr.wkbPolygon - elif name == 'MULTIPOINT': - eGType = ogr.wkbMultiPoint - elif name == 'MULTILINESTRING': - eGType = ogr.wkbMultiLineString - elif name == 'MULTIPOLYGON': - eGType = ogr.wkbMultiPolygon - elif name == 'GEOMETRYCOLLECTION': - eGType = ogr.wkbGeometryCollection - elif name == 'CIRCULARSTRING': - eGType = ogr.wkbCircularString - elif name == 'COMPOUNDCURVE': - eGType = ogr.wkbCompoundCurve - elif name == 'CURVEPOLYGON': - eGType = ogr.wkbCurvePolygon - elif name == 'MULTICURVE': - eGType = ogr.wkbMultiCurve - elif name == 'MULTISURFACE': - eGType = ogr.wkbMultiSurface - elif name == 'TRIANGLE': - eGType = ogr.wkbTriangle - elif name == 'POLYHEDRALSURFACE': - eGType = ogr.wkbPolyhedralSurface - elif name == 'TIN': - eGType = ogr.wkbTIN - elif name == 'CURVE': - eGType = ogr.wkbCurve - elif name == 'SURFACE': - eGType = ogr.wkbSurface - else: - eGType = ogr.wkbUnknown - - if convertTo3D: - eGType = ogr.GT_SetZ(eGType) - - if isMeasured: - eGType = ogr.GT_SetM(eGType) - - return eGType - -# Parse geometry type, cf. ogr2ogr_lib.cpp -def parseGeomType(name): - if name is None: - return ogr.wkbUnknown - name2 = name.upper() - - is3D = False - if name2.endswith('25D'): - name2 = name2[:-3] # alias - is3D = True - elif name2.endswith('Z'): - name2 = name2[:-1] - is3D = True - - if name2 == 'NONE': - eGType = ogr.wkbNone - elif name2 == 'GEOMETRY': - eGType = ogr.wkbUnknown - else: - eGType = fromGeomTypeName(name2) - if eGType == ogr.wkbUnknown: - raise Exception(f'Unknown geometry type "{name}"') - - if eGType != ogr.wkbNone and is3D: - eGType = ogr.GT_SetZ(eGType) - - return eGType - -# cf. ogr/ogr_core.h's enum OGRFieldType; -def parseFieldType(name): - if name is None: - raise Exception('parseFieldType(None)') - name2 = name.lower() - if name2 == 'integer': - # simple 32bit integer - return ogr.OFTInteger - elif name2 == 'integerlist': - # List of 32bit integers - return ogr.OFTIntegerList - elif name2 == 'real': - # Double Precision floating point - return ogr.OFTReal - elif name2 == 'reallist': - # List of doubles - return ogr.OFTRealList - elif name2 == 'string': - # String of ASCII chars - return ogr.OFTString - elif name2 == 'stringlist': - # Array of strings - return ogr.OFTStringList - elif name2 == 'binary': - # Raw Binary data - return ogr.OFTBinary - elif name2 == 'date': - # Date - return ogr.OFTDate - elif name2 == 'time': - # Time - return ogr.OFTTime - elif name2 == 'datetime': - # Date and Time - return ogr.OFTDateTime - elif name2 == 'integer64': - # Single 64bit integer - return ogr.OFTInteger64 - elif name2 == 'integer64list': - # List of 64bit integers - return ogr.OFTInteger64List - else: - raise Exception(f'Unknown field type "{name}"') - -# cf. ogr/ogr_core.h's enum OGRFieldSubType; -def parseSubFieldType(name): - if name is None: - raise Exception('parseSubFieldType(None)') - name2 = name.lower() - if name2 == 'none': - # No subtype. This is the default value. - return ogr.OFSTNone - elif name2 == 'bool': - # Boolean integer. Only valid for OFTInteger and OFTIntegerList. - return ogr.OFSTBoolean - elif name2 == 'int16': - # Signed 16-bit integer. Only valid for OFTInteger and OFTIntegerList. - return ogr.OFSTInt16 - elif name2 == 'float32': - # Single precision (32 bit) floating point. Only valid for OFTReal and OFTRealList. - return ogr.OFSTFloat32 - elif name2 == 'json': - # JSON content. Only valid for OFTString. - return ogr.OFSTJSON - elif name2 == 'uuid': - # UUID string representation. Only valid for OFTString. - return ogr.OFSTUUID - else: - raise Exception(f'Unknown field subtype "{name}"') - -# Parse timezone -TZ_RE = re.compile(r'(?:UTC\b)?([\+\-]?)([0-9][0-9]):?([0-9][0-9])', flags=re.IGNORECASE) -def parseTimeZone(tz): - if tz is None: - raise Exception('parseTimeZone(None)') - tz2 = tz.lower() - if tz2 == 'none': - return ogr.TZFLAG_UNKNOWN - elif tz2 == 'local': - return ogr.TZFLAG_LOCALTIME - elif tz2 == 'utc' or tz2 == 'gmt': - return ogr.TZFLAG_UTC - - m = TZ_RE.fullmatch(tz) - if m is None: - raise Exception(f'Invalid timezone "{tz}"') - tzSign = m.group(1) - tzHour = int(m.group(2)) - tzMinute = int(m.group(3)) - if tzHour > 14 or tzMinute >= 60 or tzMinute % 15 != 0: - raise Exception(f'Invalid timezone "{tz}"') - tzFlag = tzHour*4 + int(tzMinute/15) - if tzSign == '-': - tzFlag = 100 - tzFlag - else: - tzFlag += 100 - return tzFlag - -# Pretty-print timezone flag, cf. -# ogr/ogrutils.cpp:OGRGetISO8601DateTime() -def formatTZFlag(tzFlag): - if tzFlag is None: - raise Exception('printTimeZone(None)') - if tzFlag == ogr.TZFLAG_UNKNOWN: - return 'none' - elif tzFlag == ogr.TZFLAG_LOCALTIME: - return 'local' - elif tzFlag == ogr.TZFLAG_UTC: - return 'UTC' - - tzOffset = abs(tzFlag - 100) * 15; - tzHour = int(tzOffset / 60); - tzMinute = int(tzOffset % 60); - tzSign = '+' if tzFlag > 100 else '-' - return f'{tzSign}{tzHour:02}{tzMinute:02}' - -def setFieldIf(cond, attrName, val, data, fldName, drvName, log=logging.warning): +from common import ( + BadConfiguration, + parse_config_dl, + escape_identifier, + escape_literal_str, + getSourcePathLockFileName +) +from common_gdal import ( + gdalVersionMin, + gdalGetMetadataItem, + getSRS, + getExtent, + parseGeomType, + parseFieldType, + parseSubFieldType, + parseTimeZone +) +from import_source import ( + openOutputDS, + createOutputLayer, + validateOutputLayer, + importSources, + ImportStatus +) +from export_mvt import exportMVT + +def setFieldIf(cond : bool, + attrName : str, + val : Any, + data : dict[str, Any], + fldName : str, + drvName : str, + log = logging.warning) -> None: + """Conditionally set a field""" if cond: data[attrName] = val else: @@ -341,19 +89,23 @@ def setFieldIf(cond, attrName, val, data, fldName, drvName, log=logging.warning) log('Ignoring %s=%s on field "%s" (not supported by %s driver)', attrName, val2, fldName, drvName) -# Validate layer creation options and schema. The schema is modified in -# place with the parsed result. -# (We need the driver of the output dataset to determine capability on -# constraints.) -def validateSchema(layers, drvo=None, lco_defaults=None): +# pylint: disable-next=too-many-branches +def validate_schema(layers : dict[str, Any], + drvo : Optional[gdal.Driver] = None, + lco_defaults : Optional[dict[str, str]] = None) -> None: + """Validate layer creation options and schema. The schema is + modified in place with the parsed result. + (We need the driver of the output dataset to determine capability on + constraints.)""" + # Cf. https://github.com/OSGeo/gdal/blob/master/NEWS.md - if common.gdal_version_min(maj=3, min=7): + if gdalVersionMin(maj=3, min=7): # list of capability flags supported by the CreateField() API drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_CREATION_FIELD_DEFN_FLAGS) drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else [] drvoSupportsFieldComment = 'Comment' in drvoFieldDefnFlags # GetTZFlag()/SetTZFlag() and OGR_TZFLAG_* constants added in 3.8.0 - hasTZFlagSupport = common.gdal_version_min(maj=3, min=8) + hasTZFlagSupport = gdalVersionMin(maj=3, min=8) else: # list of flags supported by the OGRLayer::AlterFieldDefn() API drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_ALTER_FIELD_DEFN_FLAGS) @@ -364,9 +116,12 @@ def validateSchema(layers, drvo=None, lco_defaults=None): # cache driver capabilities drvoSupportsFieldWidthPrecision = 'WidthPrecision' in drvoFieldDefnFlags - drvoSupportsFieldNullable = 'Nullable' in drvoFieldDefnFlags and getMetadataItem(drvo, GDAL_DCAP_NOTNULL_FIELDS) - drvoSupportsFieldUnique = 'Unique' in drvoFieldDefnFlags and getMetadataItem(drvo, GDAL_DCAP_UNIQUE_FIELDS) - drvoSupportsFieldDefault = 'Default' in drvoFieldDefnFlags and getMetadataItem(drvo, GDAL_DCAP_DEFAULT_FIELDS) + drvoSupportsFieldNullable = ('Nullable' in drvoFieldDefnFlags and + gdalGetMetadataItem(drvo, GDAL_DCAP_NOTNULL_FIELDS)) + drvoSupportsFieldUnique = ('Unique' in drvoFieldDefnFlags and + gdalGetMetadataItem(drvo, GDAL_DCAP_UNIQUE_FIELDS)) + drvoSupportsFieldDefault = ('Default' in drvoFieldDefnFlags and + gdalGetMetadataItem(drvo, GDAL_DCAP_DEFAULT_FIELDS)) drvoSupportsFieldAlternativeName = 'AlternativeName' in drvoFieldDefnFlags for layername, layerdef in layers.items(): @@ -397,9 +152,9 @@ def validateSchema(layers, drvo=None, lco_defaults=None): for idx, fld_def in enumerate(fields): fld_name = fld_def.get('name', None) if fld_name is None or fld_name == '': - raise Exception(f'Field #{idx} has no name') + raise BadConfiguration(f'Field #{idx} has no name') if fld_name in fields_set: - raise Exception(f'Duplicate field "{fld_name}"') + raise BadConfiguration(f'Duplicate field "{fld_name}"') fields_set.add(fld_name) fld_def2 = { 'Name': fld_name } @@ -407,7 +162,7 @@ def validateSchema(layers, drvo=None, lco_defaults=None): k2 = k.lower() if k2 == 'name': pass - elif k2 == 'alternativename' or k2 == 'alias': + elif k2 in ('alternativename', 'alias'): setFieldIf(drvoSupportsFieldAlternativeName, 'AlternativeName', v, fld_def2, fld_name, drvo.ShortName, log=logging.debug) @@ -444,312 +199,12 @@ def validateSchema(layers, drvo=None, lco_defaults=None): setFieldIf(drvoSupportsFieldUnique, 'Unique', v, fld_def2, fld_name, drvo.ShortName) else: - raise Exception(f'Field "{fld_name}" has unknown key "{k}"') + raise BadConfiguration(f'Field "{fld_name}" has unknown key "{k}"') fields[idx] = fld_def2 -# Return the decoded Spatial Reference System -def getSRS(srs_str): - if srs_str is None: - return - srs = osr.SpatialReference() - if srs_str.startswith('EPSG:'): - code = int(srs_str.removeprefix('EPSG:')) - srs.ImportFromEPSG(code) - else: - raise Exception(f'Unknown SRS {srs_str}') - logging.debug('Default SRS: "%s" (%s)', srs.ExportToProj4(), srs.GetName()) - return srs - -# Convert extent [minX, minY, maxX, maxY] into a polygon and assign the -# given SRS. Like apps/ogr2ogr_lib.cpp, we segmentize the polygon to -# make sure it is sufficiently densified when transforming to source -# layer SRS for spatial filtering. -def getExtent(extent, srs=None): - if extent is None: - return - - if not (isinstance(extent, list) or isinstance(extent, tuple)) or len(extent) != 4: - raise Exception(f'Invalid extent {extent}') - elif srs is None: - raise Exception('Configured extent but no SRS') - - logging.debug('Configured extent in %s: %s', - srs.GetName(), ', '.join(map(str, extent))) - - ring = ogr.Geometry(ogr.wkbLinearRing) - ring.AddPoint_2D(extent[0], extent[1]) - ring.AddPoint_2D(extent[2], extent[1]) - ring.AddPoint_2D(extent[2], extent[3]) - ring.AddPoint_2D(extent[0], extent[3]) - ring.AddPoint_2D(extent[0], extent[1]) - - polygon = ogr.Geometry(ogr.wkbPolygon) - polygon.AddGeometry(ring) - - # we expressed extent as minX, minY, maxX, maxY (easting/northing - # ordered, i.e., in traditional GIS order) - srs2 = srs.Clone() - srs2.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER) - polygon.AssignSpatialReference(srs2) - polygon.TransformTo(srs) - - segment_distance_metre = 10 * 1000 - if srs.IsGeographic(): - dfMaxLength = segment_distance_metre / math.radians(srs.GetSemiMajor()) - polygon.Segmentize(dfMaxLength) - elif srs.IsProjected(): - dfMaxLength = segment_distance_metre / srs.GetLinearUnits() - polygon.Segmentize(dfMaxLength) - - return polygon - -# Validate the output layer against the provided SRS and creation options -def validateOutputLayer(lyr, srs=None, options=None): - ok = True - - # ensure the output SRS is equivalent - if srs is not None: - srs2 = lyr.GetSpatialRef() - # cf. apps/ogr2ogr_lib.cpp - srs_options = [ - 'IGNORE_DATA_AXIS_TO_SRS_AXIS_MAPPING=YES', - 'CRITERION=EQUIVALENT' - ] - if not srs.IsSame(srs2, srs_options): - logging.warning('Output layer "%s" has SRS %s,\nexpected %s', - lyr.GetName(), - srs2.ExportToPrettyWkt(), - srs.ExportToPrettyWkt()) - ok = False - - if options is None: - return ok - - layerDefn = lyr.GetLayerDefn() - n = layerDefn.GetGeomFieldCount() - if n != 1: - logging.warning('Output layer "%s" has %d != 1 geometry fields', layername, n) - - geom_type1 = lyr.GetGeomType() - geom_type2 = options['geometry-type'] - if geom_type1 != geom_type2: - logging.warning('Output layer "%s" has geometry type #%d (%s), expected #%d (%s)', - lyr.GetName(), - geom_type1, ogr.GeometryTypeToName(geom_type1), - geom_type2, ogr.GeometryTypeToName(geom_type2)) - ok = False - - fields = options.get('fields', None) - if fields is not None: - for fld in fields: - fldName = fld['Name'] - - idx = layerDefn.GetFieldIndex(fldName) - if idx < 0: - logging.warning('Output layer "%s" has no field named "%s"', - lyr.GetName(), fldName) - ok = False - continue - defn = layerDefn.GetFieldDefn(idx) - - if 'AlternativeName' in fld: - v1 = defn.GetAlternativeName() - v2 = fld['AlternativeName'] - if v1 != v2: - logging.warning('Field "%s" has AlternativeName="%s", expected "%s"', - fldName, v1, v2) - ok = False - - if 'Comment' in fld: - v1 = defn.GetComment() - v2 = fld['Comment'] - if v1 != v2: - logging.warning('Field "%s" has Comment="%s", expected "%s"', - fldName, v1, v2) - ok = False - - if 'Type' in fld: - v1 = defn.GetType() - v2 = fld['Type'] - if v1 != v2: - logging.warning('Field "%s" has Type=%d (%s), expected %d (%s)', - fldName, - v1, ogr.GetFieldTypeName(v1), - v2, ogr.GetFieldTypeName(v2)) - ok = False - - if 'SubType' in fld: - v1 = defn.GetSubType() - v2 = fld['SubType'] - if v1 != v2: - logging.warning('Field "%s" has SubType=%d (%s), expected %d (%s)', - fldName, - v1, ogr.GetFieldSubTypeName(v1), - v2, ogr.GetFieldSubTypeName(v2)) - ok = False - - if 'TZFlag' in fld: - v1 = defn.GetTZFlag() - v2 = fld['TZFlag'] - if v1 != v2: - logging.warning('Field "%s" has TZFlag=%d (%s), expected %d (%s)', - fldName, v1, formatTZFlag(v1), v2, formatTZFlag(v2)) - ok = False - - if 'Precision' in fld: - v1 = defn.GetPrecision() - v2 = fld['Precision'] - if v1 != v2: - logging.warning('Field "%s" has Precision=%d, expected %d', - fldName, v1, v2) - ok = False - - if 'Width' in fld: - v1 = defn.GetWidth() - v2 = fld['Width'] - if v1 != v2: - logging.warning('Field "%s" has Width=%d, expected %d', - fldName, v1, v2) - ok = False - - if 'Default' in fld: - v1 = defn.GetDefault() - v2 = fld['Default'] - if v1 != v2: - logging.warning('Field "%s" has Default="%s", expected "%s"', - fldName, v1, v2) - ok = False - - if 'Nullable' in fld: - v1 = bool(defn.IsNullable()) - v2 = fld['Nullable'] - if v1 != v2: - logging.warning('Field "%s" has Nullable=%s, expected %s', - fldName, v1, v2) - ok = False - - if 'Unique' in fld: - v1 = bool(defn.IsUnique()) - v2 = fld['Unique'] - if v1 != v2: - logging.warning('Field "%s" has Unique=%s, expected %s', - fldName, v1, v2) - ok = False - - return ok - -# Create output layer -def createOutputLayer(ds, layername, srs=None, options=None): - logging.info('Creating new destination layer "%s"', layername) - geom_type = options['geometry-type'] - lco = options.get('options', None) - - drv = ds.GetDriver() - if geom_type != ogr.wkbNone and drv.ShortName == 'PostgreSQL': - # “Important to set to 2 for 2D layers as it has constraints on the geometry - # dimension during loading.” - # — https://gdal.org/drivers/vector/pg.html#layer-creation-options - if ogr.GT_HasM(geom_type): - if ogr.GT_HasZ(geom_type): - dim = 'XYZM' - else: - dim = 'XYM' - elif ogr.GT_HasZ(geom_type): - dim = '3' - else: - dim = '2' - if lco is None: - lco = [] - lco = ['dim=' + dim] + lco # prepend dim= - - kwargs = { 'geom_type': geom_type } - if srs is not None: - kwargs['srs'] = srs - if lco is not None: - kwargs['options'] = lco - logging.debug('CreateLayer(%s, geom_type="%s"%s%s)', layername, - ogr.GeometryTypeToName(geom_type), - ', srs="' + kwargs['srs'].GetName() + '"' if 'srs' in kwargs else '', - ', options=' + str(kwargs['options']) if 'options' in kwargs else '') - lyr = dso.CreateLayer(layername, **kwargs) - if lyr is None: - raise Exception(f'Could not create destination layer "{layername}"') - - fields = options['fields'] - if len(fields) > 0 and not lyr.TestCapability(ogr.OLCCreateField): - raise Exception(f'Destination layer "{layername}" lacks field creation capability') - - # set up output schema - for fld in fields: - fldName = fld['Name'] - defn = ogr.FieldDefn() - defn.SetName(fldName) - - if 'AlternativeName' in fld: - v = fld['AlternativeName'] - logging.debug('Set AlternativeName="%s" on output field "%s"', str(v), fldName) - defn.SetAlternativeName(v) - - if 'Comment' in fld: - v = fld['Comment'] - logging.debug('Set Comment="%s" on output field "%s"', str(v), fldName) - defn.SetComment(v) - - if 'Type' in fld: - v = fld['Type'] - logging.debug('Set Type=%d (%s) on output field "%s"', - v, ogr.GetFieldTypeName(v), fldName) - defn.SetType(v) - - if 'SubType' in fld: - v = fld['SubType'] - logging.debug('Set SubType=%d (%s) on output field "%s"', - v, ogr.GetFieldSubTypeName(v), fldName) - defn.SetSubType(v) - - if 'TZFlag' in fld: - v = fld['TZFlag'] - logging.debug('Set TZFlag=%d (%s) on output field "%s"', - v, formatTZFlag(v), fldName) - defn.SetTZFlag(v) - - if 'Precision' in fld: - v = fld['Precision'] - logging.debug('Set Precision=%d on output field "%s"', v, fldName) - defn.SetPrecision(v) - - if 'Width' in fld: - v = fld['Width'] - logging.debug('Set Width=%d on output field "%s"', v, fldName) - defn.SetWidth(v) - - if 'Default' in fld: - v = fld['Default'] - logging.debug('Set Default=%s on output field "%s"', v, fldName) - defn.SetDefault(v) - - if 'Nullable' in fld: - v = fld['Nullable'] - logging.debug('Set Nullable=%s on output field "%s"', v, fldName) - defn.SetNullable(v) - - if 'Unique' in fld: - v = fld['Unique'] - logging.debug('Set Unique=%s on output field "%s"', v, fldName) - defn.SetUnique(v) - - if lyr.CreateField(defn, approx_ok=False) != GDAL_CE_None: - raise Exception('Could not create field "{fldName}"') - logging.debug('Added field "%s" to output layer "%s"', fldName, layername) - - # flush before calling StartTransaction() so we're not tryingn to - # rollback changes on a non-existing table - lyr.SyncToDisk() - return lyr - -# Setup output field mapping, modifying the sources dictionary in place. -def setOutputFieldMap(defn, sources): +def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]): + """Setup output field mapping, modifying the sources dictionary in place.""" fieldMap = {} n = defn.GetFieldCount() for i in range(n): @@ -771,7 +226,7 @@ def setOutputFieldMap(defn, sources): for ifld, ofld in fieldMap2.items(): i = fieldMap.get(ofld, None) if i is None: - raise Exception(f'Ouput layer has no field named "{ofld}"') + raise RuntimeError(f'Ouput layer has no field named "{ofld}"') fieldMap2[ifld] = i source_import['field-map'] = fieldMap2 @@ -785,403 +240,358 @@ def setOutputFieldMap(defn, sources): rules = [rules] for idx, rule in enumerate(rules): if rule is None or not isinstance(rule, dict): - raise Exception(f'Field "{fldName}" has invalid rule #{idx}: {rule}') + raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}') if 'type' not in rule: ruleType = rule['type'] = 'literal' else: ruleType = rule['type'] if ('replace' not in rule or 'with' not in rule or len(rule) != 3 or - ruleType is None or ruleType not in ['literal', 'regex']): - raise Exception(f'Field "{fldName}" has invalid rule #{idx}: {rule}') + ruleType is None or ruleType not in ('literal', 'regex')): + raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}') if ruleType == 'regex': rule['replace'] = re.compile(rule['replace']) rules[idx] = ( rule['replace'], rule['with'] ) -# Escape the given identifier, cf. -# swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id() -def escapeIdentifier(identifier): - if '\x00' in identifier: - raise Exception(f'Invalid identifier "{identifier}"') - # SQL:1999 delimited identifier - return '"' + identifier.replace('"', '""') + '"' - -# Clear the given layer (wipe all its features) -def clearLayer(ds, lyr): - n = -1 - if lyr.TestCapability(ogr.OLCFastFeatureCount): - n = lyr.GetFeatureCount(force=0) - if n == 0: - # nothing to clear, we're good - return - layername_esc = escapeIdentifier(lyr.GetName()) - - # XXX GDAL <3.9 doesn't have lyr.GetDataset() so we pass the DS along with the layer - drv = ds.GetDriver() - if drv.ShortName == 'PostgreSQL': - # https://www.postgresql.org/docs/15/sql-truncate.html - query = 'TRUNCATE TABLE {table} CONTINUE IDENTITY RESTRICT' - op = 'Truncating' - else: - query = 'DELETE FROM {table}' - op = 'Clearing' - logging.info('%s table %s (former feature count: %s)', op, - layername_esc, str(n) if n >= 0 else 'unknown') - ds.ExecuteSQL(query.format(table=layername_esc)) - -# Extract an archive file into the given destination directory. -def extractArchive(path, destdir, fmt=None, patterns=None, exact_matches=None): - if fmt is None: - suffix = path.suffix - if suffix is None or suffix == '' or not suffix.startswith('.'): - raise Exception(f'Could not infer archive format from "{path}"') - fmt = suffix.removeprefix('.') - - fmt = fmt.lower() - logging.debug('Unpacking %s archive %s into %s', fmt, path, destdir) - - if fmt == 'zip': - from zipfile import ZipFile - logging.debug('Opening %s as ZipFile', path) - with ZipFile(path, mode='r') as z: - namelist = listArchiveMembers(z.namelist(), - patterns=patterns, exact_matches=exact_matches) - z.extractall(path=destdir, members=namelist) - else: - raise Exception(f'Unknown archive format "{fmt}"') - -# List archive members matching the given parterns and/or exact matches -def listArchiveMembers(namelist, patterns, exact_matches=None): - if patterns is None and exact_matches is None: - # if neither patterns nor exact_matches are given we'll extract the entire archive - return namelist - if patterns is None: - patterns = [] - if exact_matches is None: - exact_matches = [] - - members = [] - for name in namelist: - ok = False - if name in exact_matches: - # try exact matches first - logging.debug('Listed archive member %s (exact match)', name) - members.append(name) - ok = True - continue - # if there are no exact matches, try patterns one by one in the supplied order - for pat in patterns: - if fnmatchcase(name, pat): - logging.debug('Listed archive member %s (matching pattern "%s")', name, pat) - members.append(name) - ok = True - break - if not ok: - logging.debug('Ignoring archive member %s', name) - return members - -# Import a source layer -def importSource(lyr, path=None, unar=None, args={}, cachedir=None, extent=None): - if unar is None: - return importSource2(lyr, str(path), args=args, - basedir=cachedir, extent=extent) - - if cachedir is not None: - path = cachedir.joinpath(path) - - ds_srcpath = Path(args['path']) - if ds_srcpath.is_absolute(): - # treat absolute paths as relative to the archive root - logging.warning('%s is absolute, removing leading anchor', ds_srcpath) - ds_srcpath = ds_srcpath.relative_to(ds_srcpath.anchor) - ds_srcpath = str(ds_srcpath) - - with tempfile.TemporaryDirectory() as tmpdir: - logging.debug('Created temporary directory %s', tmpdir) - extractArchive(path, tmpdir, - fmt=unar.get('format', None), - patterns=unar.get('patterns', None), - exact_matches=[ds_srcpath]) - return importSource2(lyr, ds_srcpath, args=args, - basedir=Path(tmpdir), extent=extent) - -# Validate field value mapping -def setFieldMapValue(fld, idx, val): - if val is None: - if not fld.IsNullable(): - logging.warning('Field "%s" is not NULLable but remaps NULL', fld.GetName()) - return None - - fldType = fld.GetType() - if fldType == ogr.OFTInteger or fldType == ogr.OFTInteger64: - if isinstance(val, int): - return val - elif fldType == ogr.OFTString: - if isinstance(val, str): - return val - elif fldType == ogr.OFTBinary: - if isinstance(val, bytes): - return val - elif fldType == ogr.OFTReal: - if isinstance(val, int): - return float(val) - elif isinstance(val, float): - return val - - raise Exception(f'Field "{fld.GetName()}" mapping #{idx} has incompatible type for {ogr.GetFieldTypeName(fldType)}') - -# Import a source layer (already extracted) -# This is more or less like ogr2ogr/GDALVectorTranslate() but we roll -# out our own (slower) version because GDALVectorTranslate() insists in -# calling StartTransaction() https://github.com/OSGeo/gdal/issues/3403 -# while we want a single transaction for the entire desination layer, -# including truncation, source imports, and metadata changes. -def importSource2(lyr_dst, path, args={}, basedir=None, extent=None): - kwargs, _ = setOpenExArgs(args, flags=GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR) - path2 = path if basedir is None else str(basedir.joinpath(path)) - - logging.debug('OpenEx(%s, %s)', path2, str(kwargs)) - ds = gdal.OpenEx(path2, **kwargs) - if ds is None: - raise Exception(f'Could not open {path2}') - - layername = args.get('layername', None) - if layername is None: - idx = 0 - lyr = ds.GetLayerByIndex(idx) - msg = '#' + str(idx) - if lyr is not None: - layername = lyr.GetName() - msg += ' ("' + layername + '")' - else: - lyr = ds.GetLayerByName(layername) - msg = '"' + layername + '"' +def processOutputLayer(ds : gdal.Dataset, + layername : str, + layerdef : dict[str,Any], + srs : Optional[osr.SpatialReference] = None, + cachedir : Path|None = None, + extent : ogr.Geometry|None = None, + dsTransaction : bool = False, + lyrcache : ogr.Layer|None = None, + force : bool = False) -> ImportStatus: + """Process an output layer.""" + + logging.info('Processing output layer "%s"', layername) + lyr = ds.GetLayerByName(layername) if lyr is None: - raise Exception(f'Could not get requested layer {msg} from {path2}') - - logging.info('Importing layer %s from "%s"', msg, path) - canIgnoreFields = lyr.TestCapability(ogr.OLCIgnoreFields) - - srs = lyr.GetSpatialRef() - if srs is None: - raise Exception('Source layer has no SRS') - - srs_dst = lyr_dst.GetSpatialRef() - if srs_dst is None: - logging.warning('Destination has no SRS, skipping coordinate transformation') - ct = None - elif srs_dst.IsSame(srs): - logging.debug('Both source and destination have the same SRS (%s), skipping coordinate transformation', - srs_dst.GetName()) - ct = None - else: - # TODO Use GetSupportedSRSList() and SetActiveSRS() with GDAL ≥3.7.0 - # when possible, see apps/ogr2ogr_lib.cpp - logging.debug('Creating transforming from source SRS (%s) to destination SRS (%s)', - srs.GetName(), srs_dst.GetName()) - ct = osr.CoordinateTransformation(srs, srs_dst) - if ct is None: - raise Exception(f'Could not create transformation from source SRS ({srs.GetName()}) ' - + f'to destination SRS ({srs_dst.GetName()})') - - defn = lyr.GetLayerDefn() - geomFieldCount = defn.GetGeomFieldCount() - if geomFieldCount != 1: # TODO Add support for multiple geometry fields - logging.warning('Source layer "%s" has %d != 1 geometry fields', layername, geomFieldCount) + raise RuntimeError(f'Failed to create output layer "{layername}"??') + if not lyr.TestCapability(ogr.OLCSequentialWrite): + raise RuntimeError(f'Output layer "{layername}" has no working ' + 'CreateFeature() method') + + sources = layerdef['sources'] + if not (lyrcache is None or force or + areSourceFilesNewer(layername, sources=sources, + lyrcache=lyrcache, + cachedir=cachedir)): + logging.info('Output layer "%s" is up to date, skipping', layername) + return ImportStatus.IMPORT_NOCHANGE + + validateOutputLayer(lyr, srs=srs, options=layerdef['create']) + + description = layerdef.get('description', None) + if (description is not None and + lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): + logging.warning('Could not set description metadata') + + # setup output field mapping in the sources dictionary + setOutputFieldMap(lyr.GetLayerDefn(), sources) + + return importSources(dso=ds, lyr=lyr, sources=sources, + cachedir=cachedir, extent=extent, + dsoTransaction=dsTransaction, + lyrcache=lyrcache, + force=force) + +def validate_sources(layers : dict[str, Any]) -> None: + """Mangle and validate layer sources and import definitions""" + toremove = set() + for layername, layerdefs in layers.items(): + sources = layerdefs.get('sources', None) + if sources is None or len(sources) < 1: + logging.warning('Output layer "%s" has no definition, skipping', layername) + toremove.add(layername) + continue - fieldCount = defn.GetFieldCount() - fieldMap = [-1] * fieldCount - fields = args['field-map'] - fieldSet = set() + for idx, layerdef in enumerate(sources): + importdef = layerdef.get('import', None) + if importdef is None: + raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' + 'has no import definition') - for i in range(fieldCount): - fld = defn.GetFieldDefn(i) - fldName = fld.GetName() - fieldMap[i] = v = fields.get(fldName, -1) - fieldSet.add(fldName) - - if v < 0 and canIgnoreFields: - # call SetIgnored() on unwanted source fields - logging.debug('Set Ignored=True on output field "%s"', fldName) - fld.SetIgnored(True) - - count0 = -1 - if lyr.TestCapability(ogr.OLCFastFeatureCount): - count0 = lyr.GetFeatureCount(force=0) - - if count0 == 0 and len(fieldSet) == 0: - # skip the below warning in some cases (e.g., GeoJSON source) - logging.info('Source layer "%s" has no fields nor features, skipping', layername) - return - - logging.debug('Field map: %s', str(fieldMap)) - for fld in fields: - if not fld in fieldSet: - logging.warning('Source layer "%s" has no field named "%s", ignoring', layername, fld) - - count1 = -1 - if args.get('spatial-filter', True) and extent is not None: - if extent.GetSpatialReference().IsSame(srs): - extent2 = extent - else: - extent2 = extent.Clone() - if extent2.TransformTo(srs) != ogr.OGRERR_NONE: - raise Exception(f'Could not transform extent {extent.ExportToWkt()} to {srs.GetName()}') + sourcedef = layerdef.get('source', None) + unar = None if sourcedef is None else sourcedef.get('unar', None) + src = None if sourcedef is None else sourcedef.get('path', None) - #logging.debug('Applying extent: %s', extent2.ExportToWkt()) - lyr.SetSpatialFilter(extent2) + ds_srcpath = importdef.get('path', None) + if src is None and unar is None and ds_srcpath is not None: + # fallback to importe:path if there is no unarchiving recipe + src = ds_srcpath + if unar is not None and ds_srcpath is None: + raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' + 'has no import source path') + if src is None: + raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' + 'has no source path') + layerdef['source'] = { 'path': src, 'unar': unar } - if lyr.TestCapability(ogr.OLCFastFeatureCount): - count1 = lyr.GetFeatureCount(force=0) + for layername in toremove: + layers.pop(layername) + +def validateLayerCacheField(defn : ogr.FeatureDefn, idx : int, + name : str, + typ : int, + subtyp : int = ogr.OFSTNone, + width : int = 0, + unique : Optional[bool] = None, + nullable : Optional[bool] = None) -> bool: + """Validate field #idx from the layer cache table.""" + n = defn.GetFieldCount() + if idx >= n: + return False + defn = defn.GetFieldDefn(idx) + + b = True + name2 = defn.GetName() + if name2 != name: + logging.warning('Layer cache\'s field #%d has name "%s" != "%s"', idx, name2, name) + b = False + + if nullable is not None and defn.IsNullable() != nullable: + # non-fatal + logging.warning('Layer cache\'s field #%d ("%s") %s nullable', + idx, name2, 'is' if defn.IsNullable() else 'isn\'t') + + if unique is not None and defn.IsUnique() != unique: + # non-fatal + logging.warning('Layer cache\'s field #%d ("%s") %s unique', + idx, name2, 'is' if defn.IsUnique() else 'isn\'t') + + typ2 = defn.GetType() + if typ2 != typ: + logging.warning('Layer cache\'s field #%d ("%s") has type %s != %s', idx, name2, + ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ)) + b = False + + subtyp2 = defn.GetSubType() + if subtyp2 != subtyp: + logging.warning('Layer cache\'s field #%d ("%s") has subtype %s != %s', idx, name2, + ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp)) + b = False + + width2 = defn.GetWidth() + if width2 != 0 and (width == 0 or width2 < width): + # non-fatal + logging.warning('Layer cache\'s field #%d ("%s") is too small (width %d < %d)', + idx, name2, width2, width) + return b + +def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool: + """Validate layer cache table.""" + drvName = ds.GetDriver().ShortName + if drvName != 'PostgreSQL': # we need hash_record_extended(), sha256() and ST_AsEWKB() + logging.warning('Unsupported cache layer for output driver %s', drvName) + return False + lyr = ds.GetLayerByName(name) + if lyr is None: + logging.warning('Table "%s" does not exist', name) + return False - if count0 >= 0: - if count1 >= 0: - logging.info('Source layer "%s" has %d features (of which %d within extent)', - layername, count0, count1) - else: - logging.info('Source layer "%s" has %d features', layername, count0) - - # build a list of triplets (field index, replacement_for_null, [(from_value, to_value), …]) - valueMap = [] - for fldName, rules in args.get('value-map', {}).items(): - i = defn.GetFieldIndex(fldName) - if i < 0: - raise Exception(f'Source layer "{layername}" has no field named "{fldName}"') - if fieldMap[i] < 0: - logging.warning('Ignored source field "%s" has value map', fldName) - continue +# if not (lyr.TestCapability(ogr.OLCRandomWrite) and +# gdalVersionMin(maj=3, min=7) and +# lyr.TestCapability(ogr.OLCUpdateFeature)): +# logging.warning('Layer "%s" does not support OLCUpdateFeature capability, ' +# 'ignoring cache', name) +# return None - hasNullReplacement = False - nullReplacement = None - mapping = [] - fld = defn.GetFieldDefn(i) - for idx, (rFrom, rTo) in enumerate(rules): - # use fld for both 'from' and 'to' (the types must match, casting is not allowed in the mapping) - if rFrom is None: - if hasNullReplacement: - logging.warning('Field "%s" has duplicate NULL replacement', fld.GetName()) - else: - setFieldMapValue(fld, idx, None) # validate NULL - rTo = setFieldMapValue(fld, idx, rTo) - hasNullReplacement = True - nullReplacement = rTo - elif isinstance(rFrom, re.Pattern): - # validate but keep the rFrom regex - setFieldMapValue(fld, idx, str(rFrom)) - rTo = setFieldMapValue(fld, idx, rTo) - mapping.append( (rFrom, rTo, 1) ) - else: - rFrom = setFieldMapValue(fld, idx, rFrom) - rTo = setFieldMapValue(fld, idx, rTo) - mapping.append( (rFrom, rTo, 0) ) - - if nullReplacement is not None or len(mapping) > 0: - valueMap.append( (i, nullReplacement, mapping) ) - - bValueMap = len(valueMap) > 0 - defn = None - - defn_dst = lyr_dst.GetLayerDefn() - eGType_dst = defn_dst.GetGeomType() - eGType_dst_HasZ = ogr.GT_HasZ(eGType_dst) - eGType_dst_HasM = ogr.GT_HasM(eGType_dst) - dGeomIsUnknown = ogr.GT_Flatten(eGType_dst) == ogr.wkbUnknown - - if bValueMap: - valueMapCounts = [0] * fieldCount - - featureCount = 0 - mismatch = {} - feature = lyr.GetNextFeature() - while feature is not None: - if bValueMap: - for i, nullReplacement, mapping in valueMap: - if not feature.IsFieldSet(i): - continue - elif feature.IsFieldNull(i): - if nullReplacement is not None: - # replace NULL with non-NULL value - feature.SetField(i, nullReplacement) - valueMapCounts[i] += 1 - continue + defn = lyr.GetLayerDefn() + fields = [ + { 'name': 'layername', 'typ': ogr.OFTString, + 'nullable': False, 'unique': True, 'width': 255 }, + { 'name': 'last_updated', 'typ': ogr.OFTDateTime, + 'nullable': False }, + { 'name': 'fingerprint', 'typ': ogr.OFTBinary, + 'nullable': False, 'width': 32 }, + ] + m = len(fields) + n = defn.GetFieldCount() + if n < m: + # this is fatal, and `all(bs)` is False so we return False below + logging.warning('Layer cache "%s" has %d < %d fields', name, n, m) + elif n != m: + logging.warning('Layer cache "%s" has %d != %d fields', name, n, m) + bs = [ validateLayerCacheField(defn, i, **fld) for i,fld in enumerate(fields) ] + if not all(bs): + return False - v = feature.GetField(i) - for rFrom, rTo, rType in mapping: - if rType == 0: - # literal - if v != rFrom: - continue - elif rType == 1: - # regex - m = rFrom.fullmatch(v) - if m is None: - continue - elif rTo is not None: - rTo = rTo.format(*m.groups()) - else: - raise Exception(str(rType)) + n = defn.GetGeomFieldCount() + if n > 0: + geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName()) + for i in range(n) ] + logging.warning('Layer cache "%s" has %d > 0 geometry field(s): %s', + name, n, ', '.join(geomFieldNames)) + + if gdalVersionMin(maj=3, min=5): + style = lyr.GetStyleTable() + if style is not None: + logging.warning('Layer cache "%s" has a style table "%s"', + name, style.GetLastStyleName()) + return True + +def areSourceFilesNewer(layername : str, + sources : dict[str,Any], + lyrcache : ogr.Layer, + cachedir : Optional[Path] = None) -> bool: + """Return a boolean indicating whether the layer cache is up to date with + respect to the source files found on disk. That is, the last modification + and last changed time of each source file needs to be equal or lower than + the `last_updated` value found in the layer cache.""" + + source_paths = set() + for source in sources: + # the same source_path can be used multiple times, stat(2) only once + source_path = source['source']['path'] + source_paths.add(source_path) + if len(source_paths) == 0: + return False - if rTo is None: - # replace non-NULL value with NULL - feature.SetFieldNull(i) - else: - # replace non-NULL value with non-NULL value - feature.SetField(i, rTo) - valueMapCounts[i] += 1 - break - - feature2 = ogr.Feature(defn_dst) - feature2.SetFromWithMap(feature, False, fieldMap) - - geom = feature2.GetGeometryRef() - if ct is not None and geom.Transform(ct) != ogr.OGRERR_NONE: - raise Exception('Could not apply coordinate transformation') - - eGType = geom.GetGeometryType() - if eGType != eGType_dst and not dGeomIsUnknown: - # Promote to multi, cf. apps/ogr2ogr_lib.cpp:ConvertType() - eGType2 = eGType - if eGType == ogr.wkbTriangle or eGType == ogr.wkbTIN or eGType == ogr.wkbPolyhedralSurface: - eGType2 = ogr.wkbMultiPolygon - elif not ogr.GT_IsSubClassOf(eGType, ogr.wkbGeometryCollection): - eGType2 = ogr.GT_GetCollection(eGType) - - eGType2 = ogr.GT_SetModifier(eGType2, eGType_dst_HasZ, eGType_dst_HasM) - if eGType2 == eGType_dst: - mismatch[eGType] = mismatch.get(eGType, 0) + 1 - geom = ogr.ForceTo(geom, eGType_dst) - # TODO call MakeValid()? - else: - raise Exception(f'Conversion from {ogr.GeometryTypeToName(eGType)} ' - f'to {ogr.GeometryTypeToName(eGType_dst)} not implemented') - feature2.SetGeometryDirectly(geom) - - if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE: - raise Exception(f'Could not transfer source feature #{feature.GetFID()}') - - featureCount += 1 - feature = lyr.GetNextFeature() - - if bValueMap: - valueMapCounts = [ (lyr.GetLayerDefn().GetFieldDefn(i).GetName(), k) for i,k in enumerate(valueMapCounts) if k > 0 ] - - lyr = None - logging.info('Imported %d features from source layer "%s"', featureCount, layername) - - if bValueMap: - if len(valueMapCounts) > 0: - valueMapCounts = ', '.join([ str(k) + '× "' + n + '"' for n,k in valueMapCounts ]) + t = None + mtimes_ns = {} + for source_path in source_paths: + path = source_path if cachedir is None else str(cachedir.joinpath(source_path)) + try: + st = os.stat(path) + if not S_ISREG(st.st_mode): + raise FileNotFoundError + mtimes_ns[source_path] = st.st_mtime_ns + # take max(mtime, ctime): if we lock source paths any update after + # aquiring the lock will yield a value larger than time.time_ns() + t2 = max(st.st_mtime_ns, st.st_ctime_ns) + if t is None or t < t2: + t = t2 + except (OSError, ValueError): + #logging.warning('Could not stat(%s)', path) + return True + assert t is not None + + attributeFilter = 'layername = ' + escape_literal_str(layername) + logging.debug('SetAttributeFilter("%s", "%s")', lyrcache.GetName(), attributeFilter) + lyrcache.SetAttributeFilter(attributeFilter) + + feature = lyrcache.GetNextFeature() + if feature is None: + # not in cache + return True + + if not feature.IsFieldSetAndNotNull(1): + ret = True + else: + # https://gdal.org/en/stable/api/python/vector_api.html#osgeo.ogr.Feature.GetFieldAsDateTime + # [ year, month, day, hour, minute, second, timezone flag ] + dt = feature.GetFieldAsDateTime(1) + if not gdalVersionMin(maj=3, min=8): + tz = None # assume local time + elif dt[6] == ogr.TZFLAG_UNKNOWN: + logging.warning('Datetime specified with unknown timezone in layer cache\'s ' + 'field #%d "%s", assuming local time', 1, + feature.GetDefnRef().GetFieldDefn(1).GetName()) + tz = None + elif dt[6] == ogr.TZFLAG_LOCALTIME: + tz = None + elif dt[6] == ogr.TZFLAG_UTC: + tz = UTC else: - valueMapCounts = '-' - logging.info('Field substitutions: %s', valueMapCounts) - - if len(mismatch) > 0: - mismatches = [ str(n) + '× ' + ogr.GeometryTypeToName(t) - for t,n in sorted(mismatch.items(), key=lambda x: x[1]) ] - logging.info('Forced conversion to %s: %s', - ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches)) - + tz = timezone(offset=timedelta(seconds=(dt[6] - ogr.TZFLAG_UTC) * 900)) + ms, s = modf(dt[5]) + dt = datetime( + year=dt[0], # including century + month=dt[1], # 01 ≤ year ≤ 12 + day=dt[2], # 01 ≤ day ≤ 31 + hour=dt[3], # 00 ≤ hour ≤ 23 + minute=dt[4], # 00 ≤ minute ≤ 59 + second=int(s), # 00 ≤ second ≤ 59 + microsecond=round(ms*1000000), + tzinfo=tz + ) + fpr = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None + logging.debug('Found entry in layer cache for "%s", last_updated=%s, fingerprint=%s', + layername, + dt.isoformat(timespec='microseconds'), + fpr.hex() if fpr is not None else 'NULL') + ret = int(dt.timestamp() * 1000000.) * 1000 < t + + if lyrcache.GetNextFeature() is not None: + raise RuntimeError(f'Duplicate key {layername}') + + if not ret: + for source_path, mtime_ns in sorted(mtimes_ns.items()): + # XXX datetime.fromtimestamp() doesn't support nanosecond input + # https://github.com/python/cpython/issues/59648 + mtime = (mtime_ns // 1000) / 1000000. + dt = datetime.fromtimestamp(mtime) + logging.info('Source file %s is unchanged (last modification time %s)', + source_path, dt.astimezone().isoformat(timespec='seconds')) + return ret + +def getLastMTimes(layerdefs : dict[str,Any], basedir : Optional[Path] = None) -> dict[str,int]: + """Return a directing mapping source paths to their last modification time + (as a timestamp in milliseconds).""" + ret = {} + for layerdef in layerdefs: + for source in layerdef['sources']: + source_path = source['source']['path'] + if source_path in ret: + continue + path = source_path if basedir is None else str(basedir.joinpath(source_path)) + try: + st = os.stat(path) + if not S_ISREG(st.st_mode): + raise FileNotFoundError + ret[source_path] = st.st_mtime_ns // 1000000 + except (OSError, ValueError): + #logging.warning('Could not stat(%s)', path) + pass + return ret + +def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]: + """Place shared locks on each source path and return their respective file + descriptors. We could do that one layerdef at a time (one output layer at a + time) to reduce the time during which the sources prevented from being + updated/downloaded, but their is some value in having consistency across the + whole import process.""" + umask = os.umask(0o002) + lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY) + try: + ret = {} + for layerdef in layerdefs: + for source in layerdef['sources']: + source_path = source['source']['path'] + if source_path in ret: + continue + lockfile = getSourcePathLockFileName(source_path) + lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664, + dir_fd=lockdir_fd) + logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)', + source_path, lockfile) + flock(lock_fd, LOCK_SH) + ret[source_path] = lock_fd + return ret + finally: + try: + os.close(lockdir_fd) + except (OSError, ValueError): + logging.exception('Could not close lockdir') + os.umask(umask) + +def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None: + """Release shared locks on the source paths. Closed FDs are removed from + the dictionary in place.""" + toremove = set() + for path, lock_fd in lock_fds.items(): + try: + os.close(lock_fd) + except (OSError, ValueError): + logging.exception('Could not release lock for %s', path) + else: + logging.debug('Released lock for %s', path) + toremove.add(path) + for path in toremove: + lock_fds.pop(path) -if __name__ == '__main__': +# pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements +def main() -> NoReturn: common.init_logger(app=os.path.basename(__file__), level=logging.INFO) parser = argparse.ArgumentParser(description='Extract and import GIS layers.') @@ -1189,135 +599,181 @@ if __name__ == '__main__': help=f'cache directory (default: {os.curdir})') parser.add_argument('--debug', action='count', default=0, help=argparse.SUPPRESS) + parser.add_argument('--lockfile', default=None, + help='obtain an exclusive lock before processing') + parser.add_argument('--lockdir-sources', default=None, + help='optional directory for lock files to source paths') + parser.add_argument('--mvtdir', default=None, + help='optional directory for Mapbox Vector Tiles (MVT)') + parser.add_argument('--mvt-compress', default=False, action='store_true', + help='whether to compress Mapbox Vector Tiles (MVT) files') + parser.add_argument('--force', default=False, action='store_true', + help='import even if no new changes were detected') parser.add_argument('groupname', nargs='*', help='group layer name(s) to process') args = parser.parse_args() - if args.debug > 0: + if args.debug > 0: # pylint: disable=duplicate-code logging.getLogger().setLevel(logging.DEBUG) if args.debug > 1: gdal.ConfigurePythonLogging(enable_debug=True) - common.load_config(groupnames=None if args.groupname == [] else args.groupname) + config = common.parse_config(groupnames=None if args.groupname == [] else args.groupname) # validate configuration - if 'dataset' not in common.config: - raise Exception('Configuration does not specify output dataset') - - layers = common.config.get('layers', {}) - for layername, layerdefs in layers.items(): - for idx, layerdef in enumerate(layerdefs['sources']): - importdef = layerdef.get('import', None) - if importdef is None: - raise Exception(f'Output layer "{layername}" source #{idx} has no import definition') - - sourcedef = layerdef.get('source', None) - unar = None if sourcedef is None else sourcedef.get('unar', None) - src = None if sourcedef is None else sourcedef['cache'].get('path', None) + if 'dataset' not in config: + raise BadConfiguration('Configuration does not specify output dataset') - ds_srcpath = importdef.get('path', None) - if src is None and unar is None and ds_srcpath is not None: - # fallback to importe:path if there is no unarchiving receipe - src = Path(ds_srcpath) - if unar is not None and ds_srcpath is None: - raise Exception(f'Output layer "{layername}" source #{idx} has no import source path') - if src is None: - raise Exception(f'Output layer "{layername}" source #{idx} has no source path') - layerdef['source'] = { 'path': src, 'unar': unar } + layers = config.get('layers', {}) + validate_sources(layers) # set global GDAL/OGR configuration options - for pszKey, pszValue in common.config.get('GDALconfig', {}).items(): + for pszKey, pszValue in config.get('GDALconfig', {}).items(): logging.debug('gdal.SetConfigOption(%s, %s)', pszKey, pszValue) gdal.SetConfigOption(pszKey, pszValue) # open output dataset (possibly create it first) - dso = openOutputDS(common.config['dataset']) + dso = openOutputDS(config['dataset']) - validateSchema(layers, - drvo=dso.GetDriver(), - lco_defaults=common.config['dataset'].get('create-layer-options', None)) + validate_schema(layers, + drvo=dso.GetDriver(), + lco_defaults=config['dataset'].get('create-layer-options', None)) # get configured Spatial Reference System and extent - srs = getSRS(common.config.get('SRS', None)) - extent = getExtent(common.config.get('extent', None), srs=srs) + srs = getSRS(config.get('SRS', None)) + extent = getExtent(config.get('extent', None), srs=srs) - cachedir = Path(args.cachedir) if args.cachedir is not None else None - rv = 0 + if args.lockfile is not None: + # obtain an exclusive lock and don't release it until exiting the program + lock_fd = os.open(args.lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644) + logging.debug('flock("%s", LOCK_EX)', args.lockfile) + flock(lock_fd, LOCK_EX) + + # create all output layers before starting the transaction for layername, layerdef in layers.items(): - sources = layerdef['sources'] - if sources is None or len(sources) < 1: - logging.warning('Output layer "%s" has no definition, skipping', layername) + lyr = dso.GetLayerByName(layername) + if lyr is not None: + # TODO dso.DeleteLayer(layername) if --overwrite and + # dso.TestCapability(ogr.ODsCDeleteLayer) + # (Sets OVERWRITE=YES for PostgreSQL and GPKG.) continue + if not dso.TestCapability(ogr.ODsCCreateLayer): + raise RuntimeError(f'Output driver {dso.GetDriver().ShortName} does not ' + 'support layer creation') + createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None)) + + if args.mvtdir is not None: + args.mvtdir = Path(args.mvtdir) + if args.mvtdir == Path(): # make sure it's not curdir as we don't want to exchange it + raise RuntimeError('Invalid value for --mvtdir') + args.mvtdir.parent.mkdir(parents=True, exist_ok=True) + + if args.cachedir is not None: + args.cachedir = Path(args.cachedir) + if args.lockdir_sources is None: + sourcePathLocks = None + else: + sourcePathLocks = lockSourcePaths(layerdefs=layers.values(), + lockdir=args.lockdir_sources) + + if (dso.TestCapability(ogr.ODsCTransactions) and + # we need SAVEPOINT support + dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')): + logging.debug('Starting transaction') + dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE + else: + logging.warning('Output driver %s does not support dataset transactions or SQL SAVEPOINTs', + dso.GetDriver().ShortName) + dsoTransaction = False + + # validate layer cache + lyr_cache = config['dataset'].get('layercache', None) + if lyr_cache is None: + pass + elif validateCacheLayer(dso, lyr_cache): + lyr_cache = dso.GetLayerByName(lyr_cache) + else: + if not args.force: + logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache) + args.force = True + lyr_cache = None - logging.info('Processing output layer "%s"', layername) - transaction = False - try: - # get output layer - outLayerIsNotEmpty = True - lco = layerdef.get('create', None) - lyr = dso.GetLayerByName(layername) - if lyr is not None: - # TODO dso.DeleteLayer(layername) if --overwrite and dso.TestCapability(ogr.ODsCDeleteLayer) - # (Sets OVERWRITE=YES for PostgreSQL and GPKG.) - validateOutputLayer(lyr, srs=srs, options=lco) - # TODO bail out if all source files are older than lyr's last_change - elif not dso.TestCapability(ogr.ODsCCreateLayer): - raise Exception(f'Output driver {dso.GetDriver().ShortName} does not support layer creation') - elif lco is None or len(lco) < 1: - raise Exception(f'Missing schema for new output layer "{layername}"') - else: - lyr = createOutputLayer(dso, layername, srs=srs, options=lco) - outLayerIsNotEmpty = False - - if not lyr.TestCapability(ogr.OLCSequentialWrite): - raise Exception(f'Output layer "{layername}" has no working CreateFeature() method') - - # setup output field mapping in the sources dictionary - setOutputFieldMap(lyr.GetLayerDefn(), sources) - - # start transaction if possible - if lyr.TestCapability(ogr.OLCTransactions): - logging.debug('Starting transaction') - transaction = lyr.StartTransaction() == ogr.OGRERR_NONE - else: - logging.warning('Unsafe update, output layer "%s" does not support transactions', layername) - - if outLayerIsNotEmpty: - # clear non-empty output layer - clearLayer(dso, lyr) - - description = layerdef.get('description', None) - if description is not None and lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None: - logging.warning('Could not set description metadata') - - for source in sources: - # import source layers - importSource(lyr, **source['source'], args=source['import'], - cachedir=cachedir, extent=extent) - - if transaction: - # commit transaction - logging.debug('Committing transaction') - transaction = False - if lyr.CommitTransaction() != ogr.OGRERR_NONE: - logging.error('Could not commit transaction') - rv = 1 - - except Exception: - if transaction: - logging.error('Exception occured in transaction, rolling back') - try: - if lyr.RollbackTransaction() != ogr.OGRERR_NONE: + rv = 0 + try: + r = {} + n = 0 + start = time_monotonic() + for layername, layerdef in layers.items(): + r[layername] = r0 = processOutputLayer(dso, layername, layerdef, + srs=srs, + cachedir=args.cachedir, + extent=extent, + dsTransaction=dsoTransaction, + lyrcache=lyr_cache, + force=args.force) + n += 1 + logging.info('Import result status for layer "%s": %s', layername, str(r0)) + if r0 == ImportStatus.IMPORT_ERROR: + rv = 1 + if dsoTransaction: + dsoTransaction = False + logging.debug('Rolling back transaction') + # no need to catch the exception here + if dso.CommitTransaction() != ogr.OGRERR_NONE: logging.error('Could not rollback transaction') - except RuntimeError: - logging.exception('Could not rollback transaction') - logging.exception('Could not import layer "%s"', layername) - rv = 1 - - finally: - # close output layer - lyr = None - - dso = None - srs = None - extent = None - exit(rv) + break + elapsed = time_monotonic() - start + logging.info('Processed %d destination layers in %s', n, common.format_time(elapsed)) + + # get mtimes before releasing the source locks + last_modified = getLastMTimes(layerdefs=layers.values(), basedir=args.cachedir) + + if sourcePathLocks is not None: + releaseSourcePathLocks(sourcePathLocks) + + export_layers = { l:d for l,d in layers.items() if d.get('publish', None) is not None } + if args.mvtdir is None or any(r0 == ImportStatus.IMPORT_ERROR for r0 in r.values()): + pass + elif len(export_layers) == 0: + logging.warning('--mvtdir option used but no layer has a publication definition') + elif (all(r0 == ImportStatus.IMPORT_NOCHANGE for l,r0 in r.items() if l in export_layers) + and args.mvtdir.is_dir()): + logging.info('Skipping MVT export for group %s (no changes)', + ', '.join(args.groupname) if args.groupname is not None else '*') + else: + exportMVT(dso, + layers=export_layers, + sources=parse_config_dl(config.get('downloads', [])), + license_info=config.get('license-info', {}), + last_modified=last_modified, + dst=args.mvtdir, + default_options=config.get('vector-tiles', None), + compress=args.mvt_compress) + + if dsoTransaction: + dsoTransaction = False + logging.debug('Committing transaction') + if dso.CommitTransaction() != ogr.OGRERR_NONE: + logging.error('Could not commit transaction') + rv = 1 + + except Exception: # pylint: disable=broad-exception-caught + if dsoTransaction: + logging.exception('Exception occured within transaction, rolling back') + try: + if dso.RollbackTransaction() != ogr.OGRERR_NONE: + logging.error('Could not rollback transaction') + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not rollback transaction') + else: + traceback.print_exc() + sys.exit(1) + + finally: + lyr_cache = None + dso = None + extent = None + srs = None + sys.exit(rv) + +gdal.UseExceptions() +main() |