diff options
Diffstat (limited to 'import_source.py')
| -rw-r--r-- | import_source.py | 100 |
1 files changed, 70 insertions, 30 deletions
diff --git a/import_source.py b/import_source.py index 13a8e6c..1271981 100644 --- a/import_source.py +++ b/import_source.py @@ -26,7 +26,7 @@ import re from fnmatch import fnmatchcase from pathlib import Path from datetime import datetime, timedelta, UTC -from typing import Any, Final, Iterator, Optional +from typing import Any, Callable, Final, Iterator, Optional import traceback from enum import Enum, unique as enum_unique from hashlib import sha256 @@ -37,6 +37,7 @@ from osgeo.gdalconst import ( OF_ALL as GDAL_OF_ALL, OF_READONLY as GDAL_OF_READONLY, OF_UPDATE as GDAL_OF_UPDATE, + OF_VECTOR as GDAL_OF_VECTOR, OF_VERBOSE_ERROR as GDAL_OF_VERBOSE_ERROR, DCAP_CREATE as GDAL_DCAP_CREATE, ) @@ -46,7 +47,6 @@ from common import BadConfiguration, escape_identifier, escape_literal_str from common_gdal import ( gdalSetOpenExArgs, gdalGetMetadataItem, - gdalVersionMin, formatTZFlag, getSpatialFilterFromGeometry, ) @@ -56,7 +56,8 @@ def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset: create-options is a non-empty dictionary.""" path = def_dict['path'] - kwargs, drv = gdalSetOpenExArgs(def_dict, flags=GDAL_OF_UPDATE|GDAL_OF_VERBOSE_ERROR) + kwargs, drv = gdalSetOpenExArgs(def_dict, + flags=GDAL_OF_VECTOR|GDAL_OF_UPDATE|GDAL_OF_VERBOSE_ERROR) try: logging.debug('OpenEx(%s, %s)', path, str(kwargs)) return gdal.OpenEx(path, **kwargs) @@ -148,9 +149,6 @@ def createOutputLayer(ds : gdal.Dataset, lyr = ds.CreateLayer(layername, **kwargs) if lyr is None: raise RuntimeError(f'Could not create destination layer "{layername}"') - # TODO use CreateLayerFromGeomFieldDefn() from ≥v3.9 as it's not - # possible to toggle the geomfield's nullable property after fact - # otherwise fields = options['fields'] if len(fields) > 0 and not lyr.TestCapability(ogr.OLCCreateField): @@ -216,9 +214,20 @@ def createOutputLayer(ds : gdal.Dataset, defn.SetUnique(v) if lyr.CreateField(defn, approx_ok=False) != gdalconst.CE_None: - raise RuntimeError('Could not create field "{fldName}"') + raise RuntimeError(f'Could not create field "{fldName}"') logging.debug('Added field "%s" to output layer "%s"', fldName, layername) + if lyr.TestCapability(ogr.OLCAlterGeomFieldDefn): + # it appears using .CreateLayerFromGeomFieldDefn() on a a non-nullable + # GeomFieldDefn doesn't do anything, so we alter it after the fact instead + # (GPKG doesn't support this, use GEOMETRY_NULLABLE=NO in layer creation + # options instead) + flags = drv.GetMetadataItem(gdal.DMD_ALTER_GEOM_FIELD_DEFN_FLAGS) + if flags is not None and 'nullable' in flags.lower().split(' '): + geom_field = ogr.GeomFieldDefn(None, geom_type) + geom_field.SetNullable(False) + lyr.AlterGeomFieldDefn(0, geom_field, ogr.ALTER_GEOM_FIELD_DEFN_NULLABLE_FLAG) + # sync before calling StartTransaction() so we're not trying to rollback changes # on a non-existing table lyr.SyncToDisk() @@ -252,15 +261,29 @@ def validateOutputLayer(lyr : ogr.Layer, layerDefn = lyr.GetLayerDefn() n = layerDefn.GetGeomFieldCount() if n != 1: + if n == 0: + raise RuntimeError(f'Output layer "{lyr.GetName()}" has no geometry fields') logging.warning('Output layer "%s" has %d != 1 geometry fields', lyr.GetName(), n) - geom_type1 = lyr.GetGeomType() - geom_type2 = options['geometry-type'] - if geom_type1 != geom_type2: + iGeomField = 0 + geomField = layerDefn.GetGeomFieldDefn(iGeomField) + geomType = geomField.GetType() + logging.debug('Geometry column #%d: name="%s\", type="%s", srs=%s, nullable=%s', + iGeomField, geomField.GetName(), + ogr.GeometryTypeToName(geomType), + '-' if geomField.GetSpatialRef() is None + else '"' + geomField.GetSpatialRef().GetName() + '"', + bool(geomField.IsNullable())) + if geomField.IsNullable(): + logging.warning('Geometry column #%d "%s" of output layer "%s" is nullable', + iGeomField, geomField.GetName(), lyr.GetName()) + + geomType2 = options['geometry-type'] + if geomType != geomType2: logging.warning('Output layer "%s" has geometry type #%d (%s), expected #%d (%s)', lyr.GetName(), - geom_type1, ogr.GeometryTypeToName(geom_type1), - geom_type2, ogr.GeometryTypeToName(geom_type2)) + geomType, ogr.GeometryTypeToName(geomType), + geomType2, ogr.GeometryTypeToName(geomType2)) ok = False fields = options.get('fields', None) @@ -362,7 +385,7 @@ def validateOutputLayer(lyr : ogr.Layer, return ok -def clearLayer(ds : gdal.Dataset, lyr : ogr.Layer) -> None: +def clearLayer(lyr : ogr.Layer) -> None: """Clear the given layer (wipe all its features)""" n = -1 if lyr.TestCapability(ogr.OLCFastFeatureCount): @@ -372,7 +395,7 @@ def clearLayer(ds : gdal.Dataset, lyr : ogr.Layer) -> None: return layername_esc = escape_identifier(lyr.GetName()) - # XXX GDAL <3.9 doesn't have lyr.GetDataset() so we pass the DS along with the layer + ds = lyr.GetDataset() drv = ds.GetDriver() if drv.ShortName == 'PostgreSQL': # https://www.postgresql.org/docs/15/sql-truncate.html @@ -453,7 +476,7 @@ class ImportStatus(Enum): return self.name.removeprefix('IMPORT_') # pylint: disable-next=dangerous-default-value -def importSources(dso : gdal.Dataset, lyr : ogr.Layer, +def importSources(lyr : ogr.Layer, sources : dict[str,Any] = {}, cachedir : Path|None = None, extent : ogr.Geometry|None = None, @@ -462,6 +485,7 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, force : bool = False) -> ImportStatus: """Clear lyr and import source layers to it.""" + dso = lyr.GetDataset() layername = lyr.GetName() if dsoTransaction: # declare a SAVEPOINT (nested transaction) within the DS-level transaction @@ -482,13 +506,14 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, rv = ImportStatus.IMPORT_NOCHANGE now = datetime.now().astimezone() try: - clearLayer(dso, lyr) # TODO conditional (only if not new)? + clearLayer(lyr) # TODO conditional (only if not new)? for source in sources: - _importSource(lyr, **source['source'], + importSource0(lyr, **source['source'], args=source['import'], cachedir=cachedir, - extent=extent) + extent=extent, + callback=_importSource2) # force the PG driver to call EndCopy() to detect errors and trigger a # rollback if needed @@ -497,7 +522,7 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, if lyrcache is None: rv = ImportStatus.IMPORT_SUCCESS elif updateLayerCache(cache=lyrcache, - ds=dso, lyr=lyr, + lyr=lyr, force=force, lyrTransaction=lyrTransaction, last_updated=now): @@ -550,15 +575,17 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, return rv # pylint: disable-next=dangerous-default-value -def _importSource(lyr : ogr.Layer, +def importSource0(lyr : ogr.Layer|None = None, path : str = '/nonexistent', unar : dict[str,Any]|None = None, args : dict[str,Any] = {}, cachedir : Path|None = None, - extent : ogr.Geometry|None = None) -> None: + extent : ogr.Geometry|None = None, + callback : Callable[[ogr.Layer|None, str, dict[str,Any], Path|None, + ogr.Geometry|None], None]|None = None) -> None: """Import a source layer""" if unar is None: - return _importSource2(lyr, path, args=args, basedir=cachedir, extent=extent) + return callback(lyr, path, args=args, basedir=cachedir, extent=extent) ds_srcpath = Path(args['path']) if ds_srcpath.is_absolute(): @@ -574,7 +601,7 @@ def _importSource(lyr : ogr.Layer, fmt=unar.get('format', None), patterns=unar.get('patterns', None), exact_matches=[ds_srcpath]) - return _importSource2(lyr, ds_srcpath, args=args, basedir=Path(tmpdir), extent=extent) + return callback(lyr, ds_srcpath, args=args, basedir=Path(tmpdir), extent=extent) def setFieldMapValue(fld : ogr.FieldDefn, idx : int, @@ -613,7 +640,7 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], calling StartTransaction() https://github.com/OSGeo/gdal/issues/3403 while we want a single transaction for the entire desination layer, including truncation, source imports, and metadata changes.""" - kwargs, _ = gdalSetOpenExArgs(args, flags=GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR) + kwargs, _ = gdalSetOpenExArgs(args, flags=GDAL_OF_VECTOR|GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR) path2 = path if basedir is None else str(basedir.joinpath(path)) logging.debug('OpenEx(%s, %s)', path2, str(kwargs)) @@ -758,6 +785,15 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], if nullReplacement is not None or len(mapping) > 0: valueMap.append( (i, nullReplacement, mapping) ) + if args.get('rstrip-strings', False): + stringFieldsIdx = [ i for i in range(fieldCount) + if defn.GetFieldDefn(i).GetType() == ogr.OFTString and + fieldMap[i] >= 0 ] + logging.debug('Source field indices to rstrip: %s', str(stringFieldsIdx)) + bStringFields = len(stringFieldsIdx) > 0 + else: + bStringFields = False + bValueMap = len(valueMap) > 0 defn = None @@ -774,6 +810,12 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], mismatch = {} feature = lyr.GetNextFeature() while feature is not None: + if bStringFields: + for i in stringFieldsIdx: + if feature.IsFieldSetAndNotNull(i): + v = feature.GetField(i) + feature.SetField(i, v.rstrip()) + if bValueMap: for i, nullReplacement, mapping in valueMap: if not feature.IsFieldSet(i): @@ -893,7 +935,7 @@ def listFieldsOrderBy(defn : ogr.FeatureDefn, yield c # pylint: disable-next=too-many-branches, too-many-statements -def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, cache : ogr.Layer, +def updateLayerCache(lyr : ogr.Layer, cache : ogr.Layer, last_updated : datetime, lyrTransaction : str|bool|None = None, force : bool = False) -> bool: @@ -941,6 +983,7 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, cache : ogr.Layer, struct_dgst : Final = struct.Struct('@qq').pack logging.debug('%s', query) + ds = lyr.GetDataset() lyr2 = ds.ExecuteSQL(query) try: assert lyr2.GetLayerDefn().GetFieldDefn(0).GetName() == 'hash_properties' @@ -973,9 +1016,7 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, cache : ogr.Layer, fingerprint_old = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None assert cache.GetNextFeature() is None - if not gdalVersionMin(maj=3, min=8): - tzFlag = 0 # ogr.TZFLAG_UNKNOWN - elif last_updated.tzinfo == UTC: + if last_updated.tzinfo == UTC: tzFlag = ogr.TZFLAG_UTC else: td = last_updated.utcoffset() @@ -1036,8 +1077,7 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, cache : ogr.Layer, logging.info('Updated layer "%s" has identical fingerprint %s', layername, fingerprint.hex()[:8]) - # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead - if cache.SetFeature(feature) != ogr.OGRERR_NONE: + if cache.UpdateFeature(feature, [1,2], [], False) != ogr.OGRERR_NONE: raise RuntimeError('Could not update feature in layer cache') else: if cache.CreateFeature(feature) != ogr.OGRERR_NONE: |
