#!/usr/bin/python3 #---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (import source layers) # Copyright © 2024-2025 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- # pylint: disable=invalid-name, missing-module-docstring, fixme import logging import tempfile import re from fnmatch import fnmatchcase from pathlib import Path from datetime import datetime, timedelta, UTC from typing import Any, Optional import traceback from osgeo import gdal, ogr, osr from osgeo.gdalconst import ( OF_ALL as GDAL_OF_ALL, OF_READONLY as GDAL_OF_READONLY, OF_UPDATE as GDAL_OF_UPDATE, OF_VERBOSE_ERROR as GDAL_OF_VERBOSE_ERROR, DCAP_CREATE as GDAL_DCAP_CREATE, ) from osgeo import gdalconst from common import BadConfiguration, escape_identifier, escape_literal_str from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, gdalVersionMin, formatTZFlag def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset: """Open and return the output DS. It is created if create=False or create-options is a non-empty dictionary.""" path = def_dict['path'] kwargs, drv = gdalSetOpenExArgs(def_dict, flags=GDAL_OF_UPDATE|GDAL_OF_VERBOSE_ERROR) try: logging.debug('OpenEx(%s, %s)', path, str(kwargs)) return gdal.OpenEx(path, **kwargs) except RuntimeError as e: if not (gdal.GetLastErrorType() >= gdalconst.CE_Failure and gdal.GetLastErrorNo() == gdalconst.CPLE_OpenFailed): # not an open error raise e dso2 = None try: dso2 = gdal.OpenEx(path, nOpenFlags=GDAL_OF_ALL | GDAL_OF_UPDATE) except RuntimeError: pass if dso2 is not None: # path exists but can't be open with OpenEx(path, **kwargs) raise e try: dso2 = gdal.OpenEx(path, nOpenFlags=GDAL_OF_ALL) except RuntimeError: pass if dso2 is not None: # path exists but can't be open with OpenEx(path, **kwargs) raise e dsco = def_dict.get('create-options', None) if not def_dict.get('create', False) and dsco is None: # not configured for creation raise e if drv is None or not gdalGetMetadataItem(drv, GDAL_DCAP_CREATE): # not capable of creation raise e if 'open_options' in kwargs: # like ogr2ogr(1) logging.warning('Destination\'s open options ignored ' 'when creating the output datasource') kwargs2 = { 'eType': gdalconst.GDT_Unknown } if dsco is not None: kwargs2['options'] = [ k + '=' + str(v) for k, v in dsco.items() ] logging.debug('Create(%s, %s, eType=%s%s)', drv.ShortName, path, kwargs2['eType'], ', options=' + str(kwargs2['options']) if 'options' in kwargs2 else '') # XXX racy, a GDAL equivalent of O_EXCL would be nice return drv.Create(path, 0, 0, 0, **kwargs2) def createOutputLayer(ds : gdal.Dataset, layername : str, srs : Optional[osr.SpatialReference] = None, options : dict[str, Any]|None = None) -> ogr.Layer: """Create output layer.""" if options is None or len(options) < 1: raise BadConfiguration(f'Missing schema for new output layer "{layername}"') logging.info('Creating new destination layer "%s"', layername) geom_type = options['geometry-type'] lco = options.get('options', None) drv = ds.GetDriver() if geom_type != ogr.wkbNone and drv.ShortName == 'PostgreSQL': # “Important to set to 2 for 2D layers as it has constraints on the geometry # dimension during loading.” # — https://gdal.org/drivers/vector/pg.html#layer-creation-options if ogr.GT_HasM(geom_type): if ogr.GT_HasZ(geom_type): dim = 'XYZM' else: dim = 'XYM' elif ogr.GT_HasZ(geom_type): dim = '3' else: dim = '2' if lco is None: lco = [] lco = ['dim=' + dim] + lco # prepend dim= kwargs = { 'geom_type': geom_type } if srs is not None: kwargs['srs'] = srs if lco is not None: kwargs['options'] = lco logging.debug('CreateLayer(%s, geom_type="%s"%s%s)', layername, ogr.GeometryTypeToName(geom_type), ', srs="' + kwargs['srs'].GetName() + '"' if 'srs' in kwargs else '', ', options=' + str(kwargs['options']) if 'options' in kwargs else '') lyr = ds.CreateLayer(layername, **kwargs) if lyr is None: raise RuntimeError(f'Could not create destination layer "{layername}"') # TODO use CreateLayerFromGeomFieldDefn() from ≥v3.9 as it's not # possible to toggle the geomfield's nullable property after fact # otherwise fields = options['fields'] if len(fields) > 0 and not lyr.TestCapability(ogr.OLCCreateField): raise RuntimeError(f'Destination layer "{layername}" lacks field creation capability') # set up output schema for fld in fields: fldName = fld['Name'] defn = ogr.FieldDefn() defn.SetName(fldName) if 'AlternativeName' in fld: v = fld['AlternativeName'] logging.debug('Set AlternativeName="%s" on output field "%s"', str(v), fldName) defn.SetAlternativeName(v) if 'Comment' in fld: v = fld['Comment'] logging.debug('Set Comment="%s" on output field "%s"', str(v), fldName) defn.SetComment(v) if 'Type' in fld: v = fld['Type'] logging.debug('Set Type=%d (%s) on output field "%s"', v, ogr.GetFieldTypeName(v), fldName) defn.SetType(v) if 'SubType' in fld: v = fld['SubType'] logging.debug('Set SubType=%d (%s) on output field "%s"', v, ogr.GetFieldSubTypeName(v), fldName) defn.SetSubType(v) if 'TZFlag' in fld: v = fld['TZFlag'] logging.debug('Set TZFlag=%d (%s) on output field "%s"', v, formatTZFlag(v), fldName) defn.SetTZFlag(v) if 'Precision' in fld: v = fld['Precision'] logging.debug('Set Precision=%d on output field "%s"', v, fldName) defn.SetPrecision(v) if 'Width' in fld: v = fld['Width'] logging.debug('Set Width=%d on output field "%s"', v, fldName) defn.SetWidth(v) if 'Default' in fld: v = fld['Default'] logging.debug('Set Default=%s on output field "%s"', v, fldName) defn.SetDefault(v) if 'Nullable' in fld: v = fld['Nullable'] logging.debug('Set Nullable=%s on output field "%s"', v, fldName) defn.SetNullable(v) if 'Unique' in fld: v = fld['Unique'] logging.debug('Set Unique=%s on output field "%s"', v, fldName) defn.SetUnique(v) if lyr.CreateField(defn, approx_ok=False) != gdalconst.CE_None: raise RuntimeError('Could not create field "{fldName}"') logging.debug('Added field "%s" to output layer "%s"', fldName, layername) # sync before calling StartTransaction() so we're not trying to rollback changes # on a non-existing table lyr.SyncToDisk() return lyr # pylint: disable-next=too-many-branches def validateOutputLayer(lyr : ogr.Layer, srs : Optional[osr.SpatialReference] = None, options : Optional[dict[str, Any]] = None) -> bool: """Validate the output layer against the provided SRS and creation options.""" ok = True # ensure the output SRS is equivalent if srs is not None: srs2 = lyr.GetSpatialRef() # cf. apps/ogr2ogr_lib.cpp srs_options = [ 'IGNORE_DATA_AXIS_TO_SRS_AXIS_MAPPING=YES', 'CRITERION=EQUIVALENT' ] if not srs.IsSame(srs2, srs_options): logging.warning('Output layer "%s" has SRS %s,\nexpected %s', lyr.GetName(), srs2.ExportToPrettyWkt(), srs.ExportToPrettyWkt()) ok = False if options is None: return ok layerDefn = lyr.GetLayerDefn() n = layerDefn.GetGeomFieldCount() if n != 1: logging.warning('Output layer "%s" has %d != 1 geometry fields', lyr.GetName(), n) geom_type1 = lyr.GetGeomType() geom_type2 = options['geometry-type'] if geom_type1 != geom_type2: logging.warning('Output layer "%s" has geometry type #%d (%s), expected #%d (%s)', lyr.GetName(), geom_type1, ogr.GeometryTypeToName(geom_type1), geom_type2, ogr.GeometryTypeToName(geom_type2)) ok = False fields = options.get('fields', None) if fields is not None: for fld in fields: fldName = fld['Name'] idx = layerDefn.GetFieldIndex(fldName) if idx < 0: logging.warning('Output layer "%s" has no field named "%s"', lyr.GetName(), fldName) ok = False continue defn = layerDefn.GetFieldDefn(idx) if 'AlternativeName' in fld: v1 = defn.GetAlternativeName() v2 = fld['AlternativeName'] if v1 != v2: logging.warning('Field "%s" has AlternativeName="%s", expected "%s"', fldName, v1, v2) ok = False if 'Comment' in fld: v1 = defn.GetComment() v2 = fld['Comment'] if v1 != v2: logging.warning('Field "%s" has Comment="%s", expected "%s"', fldName, v1, v2) ok = False if 'Type' in fld: v1 = defn.GetType() v2 = fld['Type'] if v1 != v2: logging.warning('Field "%s" has Type=%d (%s), expected %d (%s)', fldName, v1, ogr.GetFieldTypeName(v1), v2, ogr.GetFieldTypeName(v2)) ok = False if 'SubType' in fld: v1 = defn.GetSubType() v2 = fld['SubType'] if v1 != v2: logging.warning('Field "%s" has SubType=%d (%s), expected %d (%s)', fldName, v1, ogr.GetFieldSubTypeName(v1), v2, ogr.GetFieldSubTypeName(v2)) ok = False if 'TZFlag' in fld: v1 = defn.GetTZFlag() v2 = fld['TZFlag'] if v1 != v2: logging.warning('Field "%s" has TZFlag=%d (%s), expected %d (%s)', fldName, v1, formatTZFlag(v1), v2, formatTZFlag(v2)) ok = False if 'Precision' in fld: v1 = defn.GetPrecision() v2 = fld['Precision'] if v1 != v2: logging.warning('Field "%s" has Precision=%d, expected %d', fldName, v1, v2) ok = False if 'Width' in fld: v1 = defn.GetWidth() v2 = fld['Width'] if v1 != v2: logging.warning('Field "%s" has Width=%d, expected %d', fldName, v1, v2) ok = False if 'Default' in fld: v1 = defn.GetDefault() v2 = fld['Default'] if v1 != v2: logging.warning('Field "%s" has Default="%s", expected "%s"', fldName, v1, v2) ok = False if 'Nullable' in fld: v1 = bool(defn.IsNullable()) v2 = fld['Nullable'] if v1 != v2: logging.warning('Field "%s" has Nullable=%s, expected %s', fldName, v1, v2) ok = False if 'Unique' in fld: v1 = bool(defn.IsUnique()) v2 = fld['Unique'] if v1 != v2: logging.warning('Field "%s" has Unique=%s, expected %s', fldName, v1, v2) ok = False return ok def clearLayer(ds : gdal.Dataset, lyr : ogr.Layer) -> None: """Clear the given layer (wipe all its features)""" n = -1 if lyr.TestCapability(ogr.OLCFastFeatureCount): n = lyr.GetFeatureCount(force=0) if n == 0: # nothing to clear, we're good return layername_esc = escape_identifier(lyr.GetName()) # XXX GDAL <3.9 doesn't have lyr.GetDataset() so we pass the DS along with the layer drv = ds.GetDriver() if drv.ShortName == 'PostgreSQL': # https://www.postgresql.org/docs/15/sql-truncate.html query = 'TRUNCATE TABLE {table} CONTINUE IDENTITY RESTRICT' op = 'Truncating' else: query = 'DELETE FROM {table}' op = 'Clearing' logging.info('%s table %s (former feature count: %s)', op, layername_esc, str(n) if n >= 0 else 'unknown') ds.ExecuteSQL(query.format(table=layername_esc)) def extractArchive(path : Path, destdir : str, fmt : str|None = None, patterns : list[str]|None = None, exact_matches : list[str]|None = None) -> None: """Extract an archive file into the given destination directory.""" if fmt is None: suffix = path.suffix if suffix is None or suffix == '' or not suffix.startswith('.'): raise RuntimeError(f'Could not infer archive format from "{path}"') fmt = suffix.removeprefix('.') fmt = fmt.lower() logging.debug('Unpacking %s archive %s into %s', fmt, path, destdir) if fmt == 'zip': import zipfile # pylint: disable=import-outside-toplevel logging.debug('Opening %s as ZipFile', path) with zipfile.ZipFile(path, mode='r') as z: namelist = listArchiveMembers(z.namelist(), patterns=patterns, exact_matches=exact_matches) z.extractall(path=destdir, members=namelist) else: raise RuntimeError(f'Unknown archive format "{fmt}"') def listArchiveMembers(namelist : list[str], patterns : list[str]|None = None, exact_matches : list[str]|None = None) -> list[str]: """List archive members matching the given parterns and/or exact matches.""" if patterns is None and exact_matches is None: # if neither patterns nor exact_matches are given we'll extract the entire archive return namelist if patterns is None: patterns = [] if exact_matches is None: exact_matches = [] members = [] for name in namelist: ok = False if name in exact_matches: # try exact matches first logging.debug('Listed archive member %s (exact match)', name) members.append(name) ok = True continue # if there are no exact matches, try patterns one by one in the supplied order for pat in patterns: if fnmatchcase(name, pat): logging.debug('Listed archive member %s (matching pattern "%s")', name, pat) members.append(name) ok = True break if not ok: logging.debug('Ignoring archive member %s', name) return members # pylint: disable-next=dangerous-default-value def importSources(dso : gdal.Dataset, lyr : ogr.Layer, sources : dict[str,Any] = {}, cachedir : Path|None = None, extent : ogr.Geometry|None = None, dsoTransaction : bool = False, lyrcache : ogr.Layer|None = None) -> bool: """Clear lyr and import source layers to it.""" layername = lyr.GetName() if dsoTransaction: # declare a SAVEPOINT (nested transaction) within the DS-level transaction lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername) logging.debug('%s', lyrTransaction) dso.ExecuteSQL(lyrTransaction) elif lyr.TestCapability(ogr.OLCTransactions): # try to start transaction on the layer logging.debug('Starting transaction on output layer "%s"', layername) lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE if not lyrTransaction: logging.warning('Couldn\'t start transaction on output layer "%s"', layername) else: logging.warning('Unsafe update, output layer "%s" doesn\'t support transactions', layername) lyrTransaction = False rv = True now = datetime.now().astimezone() try: clearLayer(dso, lyr) # TODO conditional (only if not new)? for source in sources: _importSource(lyr, **source['source'], args=source['import'], cachedir=cachedir, extent=extent) # force the PG driver to call EndCopy() to detect errors and trigger a # rollback if needed dso.FlushCache() if lyrcache is not None: updateLayerCache(layername=layername, cache=lyrcache, ds=dso, savepoint=lyrTransaction if isinstance(lyrTransaction,str) else None, last_updated=now) except Exception: # pylint: disable=broad-exception-caught rv = False if isinstance(lyrTransaction, str): query = 'ROLLBACK TO ' + lyrTransaction logging.exception('Exception occured within transaction: %s', query) # don't unset lyrTransaction here as we want to RELEASE SAVEPOINT try: dso.ExecuteSQL(query) except Exception: # pylint: disable=broad-exception-caught logging.exception('Could not execute SQL: %s', query) elif isinstance(lyrTransaction, bool) and lyrTransaction: logging.exception('Exception occured within transaction on output ' 'layer "%s": ROLLBACK', layername) lyrTransaction = None try: if lyr.RollbackTransaction() != ogr.OGRERR_NONE: logging.error('Could not rollback transaction on layer "%s"', layername) except Exception: # pylint: disable=broad-exception-caught logging.exception('Could not rollback transaction on layer "%s"', layername) else: traceback.print_exc() finally: if isinstance(lyrTransaction, str): query = 'RELEASE ' + lyrTransaction logging.debug('%s', query) try: dso.ExecuteSQL(query) except Exception: # pylint: disable=broad-exception-caught rv = False logging.exception('Could not execute SQL: %s', query) elif isinstance(lyrTransaction, bool) and lyrTransaction: try: if lyr.CommitTransaction() != ogr.OGRERR_NONE: rv = False logging.error('Could not commit transaction') except Exception: # pylint: disable=broad-exception-caught rv = False logging.exception('Could not commit transaction on layer "%s"', layername) return rv # pylint: disable-next=dangerous-default-value def _importSource(lyr : ogr.Layer, path : str = '/nonexistent', unar : dict[str,Any]|None = None, args : dict[str,Any] = {}, cachedir : Path|None = None, extent : ogr.Geometry|None = None) -> None: """Import a source layer""" if unar is None: return _importSource2(lyr, path, args=args, basedir=cachedir, extent=extent) ds_srcpath = Path(args['path']) if ds_srcpath.is_absolute(): # treat absolute paths as relative to the archive root logging.warning('%s is absolute, removing leading anchor', ds_srcpath) ds_srcpath = ds_srcpath.relative_to(ds_srcpath.anchor) ds_srcpath = str(ds_srcpath) with tempfile.TemporaryDirectory() as tmpdir: logging.debug('Created temporary directory %s', tmpdir) extractArchive(Path(path) if cachedir is None else cachedir.joinpath(path), tmpdir, fmt=unar.get('format', None), patterns=unar.get('patterns', None), exact_matches=[ds_srcpath]) return _importSource2(lyr, ds_srcpath, args=args, basedir=Path(tmpdir), extent=extent) def setFieldMapValue(fld : ogr.FieldDefn, idx : int, val : None|int|str|bytes|float) -> None|int|str|bytes|float: """Validate field value mapping.""" if val is None: if not fld.IsNullable(): logging.warning('Field "%s" is not NULLable but remaps NULL', fld.GetName()) return None fldType = fld.GetType() if fldType in (ogr.OFTInteger, ogr.OFTInteger64): if isinstance(val, int): return val elif fldType == ogr.OFTString: if isinstance(val, str): return val elif fldType == ogr.OFTBinary: if isinstance(val, bytes): return val elif fldType == ogr.OFTReal: if isinstance(val, int): return float(val) if isinstance(val, float): return val raise RuntimeError(f'Field "{fld.GetName()}" mapping #{idx} has incompatible type ' f'for {ogr.GetFieldTypeName(fldType)}') # pylint: disable-next=too-many-branches, too-many-locals, too-many-statements def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], basedir : Path|None, extent : ogr.Geometry|None) -> None: """Import a source layer (already extracted) This is more or less like ogr2ogr/GDALVectorTranslate() but we roll out our own (slower) version because GDALVectorTranslate() insists in calling StartTransaction() https://github.com/OSGeo/gdal/issues/3403 while we want a single transaction for the entire desination layer, including truncation, source imports, and metadata changes.""" kwargs, _ = gdalSetOpenExArgs(args, flags=GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR) path2 = path if basedir is None else str(basedir.joinpath(path)) logging.debug('OpenEx(%s, %s)', path2, str(kwargs)) ds = gdal.OpenEx(path2, **kwargs) if ds is None: raise RuntimeError(f'Could not open {path2}') layername = args.get('layername', None) if layername is None: idx = 0 lyr = ds.GetLayerByIndex(idx) msg = '#' + str(idx) if lyr is not None: layername = lyr.GetName() msg += ' ("' + layername + '")' else: lyr = ds.GetLayerByName(layername) msg = '"' + layername + '"' if lyr is None: raise RuntimeError(f'Could not get requested layer {msg} from {path2}') logging.info('Importing layer %s from "%s"', msg, path) canIgnoreFields = lyr.TestCapability(ogr.OLCIgnoreFields) srs = lyr.GetSpatialRef() if srs is None: raise RuntimeError('Source layer has no SRS') srs_dst = lyr_dst.GetSpatialRef() if srs_dst is None: logging.warning('Destination has no SRS, skipping coordinate transformation') ct = None elif srs_dst.IsSame(srs): logging.debug('Both source and destination have the same SRS (%s), ' 'skipping coordinate transformation', srs_dst.GetName()) ct = None else: # TODO Use GetSupportedSRSList() and SetActiveSRS() with GDAL ≥3.7.0 # when possible, see apps/ogr2ogr_lib.cpp logging.debug('Creating transforming from source SRS (%s) to destination SRS (%s)', srs.GetName(), srs_dst.GetName()) ct = osr.CoordinateTransformation(srs, srs_dst) if ct is None: raise RuntimeError(f'Could not create transformation from source SRS ({srs.GetName()}) ' f'to destination SRS ({srs_dst.GetName()})') defn = lyr.GetLayerDefn() geomFieldCount = defn.GetGeomFieldCount() if geomFieldCount != 1: # TODO Add support for multiple geometry fields logging.warning('Source layer "%s" has %d != 1 geometry fields', layername, geomFieldCount) fieldCount = defn.GetFieldCount() fieldMap = [-1] * fieldCount fields = args['field-map'] fieldSet = set() ignoredFieldNames = [] for i in range(fieldCount): fld = defn.GetFieldDefn(i) fldName = fld.GetName() fieldMap[i] = v = fields.get(fldName, -1) fieldSet.add(fldName) if v < 0 and canIgnoreFields: # call SetIgnored() on unwanted source fields logging.debug('Set Ignored=True on output field "%s"', fldName) fld.SetIgnored(True) ignoredFieldNames.append(fldName) count0 = -1 if lyr.TestCapability(ogr.OLCFastFeatureCount): count0 = lyr.GetFeatureCount(force=0) if count0 == 0 and len(fieldSet) == 0: # skip the below warning in some cases (e.g., GeoJSON source) logging.info('Source layer "%s" has no fields nor features, skipping', layername) return logging.debug('Field map: %s', str(fieldMap)) for fld in fields: if not fld in fieldSet: logging.warning('Source layer "%s" has no field named "%s", ignoring', layername, fld) count1 = -1 if args.get('spatial-filter', True) and extent is not None: if extent.GetSpatialReference().IsSame(srs): extent2 = extent else: extent2 = extent.Clone() if extent2.TransformTo(srs) != ogr.OGRERR_NONE: raise RuntimeError(f'Could not transform extent {extent.ExportToWkt()} ' f'to {srs.GetName()}') #logging.debug('Applying extent: %s', extent2.ExportToWkt()) lyr.SetSpatialFilter(extent2) if lyr.TestCapability(ogr.OLCFastFeatureCount): count1 = lyr.GetFeatureCount(force=0) if count0 >= 0: if count1 >= 0: logging.info('Source layer "%s" has %d features (%d of which intersecting extent)', layername, count0, count1) else: logging.info('Source layer "%s" has %d features', layername, count0) logging.info('Ignored fields from source layer: %s', '-' if len(ignoredFieldNames) == 0 else ', '.join(ignoredFieldNames)) # build a list of triplets (field index, replacement_for_null, [(from_value, to_value), …]) valueMap = [] for fldName, rules in args.get('value-map', {}).items(): i = defn.GetFieldIndex(fldName) if i < 0: raise RuntimeError(f'Source layer "{layername}" has no field named "{fldName}"') if fieldMap[i] < 0: logging.warning('Ignored source field "%s" has value map', fldName) continue hasNullReplacement = False nullReplacement = None mapping = [] fld = defn.GetFieldDefn(i) for idx, (rFrom, rTo) in enumerate(rules): # use fld for both 'from' and 'to' (the types must match, casting is not # allowed in the mapping) if rFrom is None: if hasNullReplacement: logging.warning('Field "%s" has duplicate NULL replacement', fld.GetName()) else: setFieldMapValue(fld, idx, None) # validate NULL rTo = setFieldMapValue(fld, idx, rTo) hasNullReplacement = True nullReplacement = rTo elif isinstance(rFrom, re.Pattern): # validate but keep the rFrom regex setFieldMapValue(fld, idx, str(rFrom)) rTo = setFieldMapValue(fld, idx, rTo) mapping.append( (rFrom, rTo, 1) ) else: rFrom = setFieldMapValue(fld, idx, rFrom) rTo = setFieldMapValue(fld, idx, rTo) mapping.append( (rFrom, rTo, 0) ) if nullReplacement is not None or len(mapping) > 0: valueMap.append( (i, nullReplacement, mapping) ) bValueMap = len(valueMap) > 0 defn = None defn_dst = lyr_dst.GetLayerDefn() eGType_dst = defn_dst.GetGeomType() eGType_dst_HasZ = ogr.GT_HasZ(eGType_dst) eGType_dst_HasM = ogr.GT_HasM(eGType_dst) dGeomIsUnknown = ogr.GT_Flatten(eGType_dst) == ogr.wkbUnknown if bValueMap: valueMapCounts = [0] * fieldCount featureCount = 0 mismatch = {} feature = lyr.GetNextFeature() while feature is not None: if bValueMap: for i, nullReplacement, mapping in valueMap: if not feature.IsFieldSet(i): continue if feature.IsFieldNull(i): if nullReplacement is not None: # replace NULL with non-NULL value feature.SetField(i, nullReplacement) valueMapCounts[i] += 1 continue v = feature.GetField(i) for rFrom, rTo, rType in mapping: if rType == 0: # literal if v != rFrom: continue elif rType == 1: # regex m = rFrom.fullmatch(v) if m is None: continue if rTo is not None: rTo = rTo.format(*m.groups()) else: raise RuntimeError(str(rType)) if rTo is None: # replace non-NULL value with NULL feature.SetFieldNull(i) else: # replace non-NULL value with non-NULL value feature.SetField(i, rTo) valueMapCounts[i] += 1 break feature2 = ogr.Feature(defn_dst) feature2.SetFromWithMap(feature, False, fieldMap) geom = feature2.GetGeometryRef() if geom is None: if eGType_dst != ogr.wkbNone: logging.warning('Source feature #%d has no geometry, trying to transfer anyway', feature.GetFID()) else: if ct is not None and geom.Transform(ct) != ogr.OGRERR_NONE: raise RuntimeError('Could not apply coordinate transformation') eGType = geom.GetGeometryType() if eGType != eGType_dst and not dGeomIsUnknown: # Promote to multi, cf. apps/ogr2ogr_lib.cpp:ConvertType() eGType2 = eGType if eGType in (ogr.wkbTriangle, ogr.wkbTIN, ogr.wkbPolyhedralSurface): eGType2 = ogr.wkbMultiPolygon elif not ogr.GT_IsSubClassOf(eGType, ogr.wkbGeometryCollection): eGType2 = ogr.GT_GetCollection(eGType) eGType2 = ogr.GT_SetModifier(eGType2, eGType_dst_HasZ, eGType_dst_HasM) if eGType2 == eGType_dst: mismatch[eGType] = mismatch.get(eGType, 0) + 1 geom = ogr.ForceTo(geom, eGType_dst) # TODO call MakeValid()? else: raise RuntimeError(f'Conversion from {ogr.GeometryTypeToName(eGType)} ' f'to {ogr.GeometryTypeToName(eGType_dst)} not implemented') feature2.SetGeometryDirectly(geom) if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE: raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}') featureCount += 1 feature = lyr.GetNextFeature() if bValueMap: valueMapCounts = [ (lyr.GetLayerDefn().GetFieldDefn(i).GetName(), k) for i,k in enumerate(valueMapCounts) if k > 0 ] lyr = None logging.info('Imported %d features from source layer "%s"', featureCount, layername) if bValueMap: if len(valueMapCounts) > 0: valueMapCounts = ', '.join([ str(k) + '× "' + n + '"' for n,k in valueMapCounts ]) else: valueMapCounts = '-' logging.info('Field substitutions: %s', valueMapCounts) if len(mismatch) > 0: mismatches = [ str(n) + '× ' + ogr.GeometryTypeToName(t) for t,n in sorted(mismatch.items(), key=lambda x: x[1]) ] logging.info('Forced conversion to %s: %s', ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches)) def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer, last_updated : datetime, savepoint : str|None = None) -> None: """Update attributes in the layer cache for the given layer name.""" attributeFilter = 'layername = ' + escape_literal_str(layername) logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter) cache.SetAttributeFilter(attributeFilter) feature = cache.GetNextFeature() if feature is None: # not in cache logging.debug('Creating new feature in layer cache for %s', attributeFilter) update = False feature = ogr.Feature(cache.GetLayerDefn()) feature.SetFieldString(0, layername) else: logging.debug('Updating existing feature in layer cache for %s', attributeFilter) update = True assert cache.GetNextFeature() is None if not gdalVersionMin(maj=3, min=8): tzFlag = 0 # ogr.TZFLAG_UNKNOWN elif last_updated.tzinfo == UTC: tzFlag = ogr.TZFLAG_UTC else: td = last_updated.utcoffset() # 15min increments/decrements per unit above/below UTC, cf. # https://gdal.org/en/stable/api/vector_c_api.html#c.OGR_TZFLAG_UTC tzFlag = td.days * 96 + td.seconds // 900 if timedelta(seconds=tzFlag*900) != td or abs(tzFlag) > 56: # max ±14:00 raise RuntimeError(f'Invalid UTC offset {td}') tzFlag += ogr.TZFLAG_UTC feature.SetField(1, last_updated.year, last_updated.month, last_updated.day, last_updated.hour, last_updated.minute, float(last_updated.second) + float(last_updated.microsecond)/1000000., tzFlag) if update: # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead if cache.SetFeature(feature) != ogr.OGRERR_NONE: raise RuntimeError('Could not update feature in layer cache') else: if cache.CreateFeature(feature) != ogr.OGRERR_NONE: raise RuntimeError('Could not create new feature in layer cache') # force the PG driver to call EndCopy() to detect errors and trigger a # rollback if needed ds.FlushCache()