diff options
Diffstat (limited to 'import_source.py')
-rw-r--r-- | import_source.py | 205 |
1 files changed, 188 insertions, 17 deletions
diff --git a/import_source.py b/import_source.py index 2d2f116..c4cb96f 100644 --- a/import_source.py +++ b/import_source.py @@ -26,8 +26,10 @@ import re from fnmatch import fnmatchcase from pathlib import Path from datetime import datetime, timedelta, UTC -from typing import Any, Optional +from typing import Any, Final, Optional import traceback +from hashlib import sha256 +import struct from osgeo import gdal, ogr, osr from osgeo.gdalconst import ( @@ -439,7 +441,8 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, cachedir : Path|None = None, extent : ogr.Geometry|None = None, dsoTransaction : bool = False, - lyrcache : ogr.Layer|None = None) -> bool: + lyrcache : ogr.Layer|None = None, + force : bool = False) -> bool: """Clear lyr and import source layers to it.""" layername = lyr.GetName() @@ -461,6 +464,8 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, rv = True now = datetime.now().astimezone() + fingerprint = sha256() + fingerprint_map = fingerprintLayerDefn(fingerprint, lyr) try: clearLayer(dso, lyr) # TODO conditional (only if not new)? @@ -468,17 +473,26 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, _importSource(lyr, **source['source'], args=source['import'], cachedir=cachedir, - extent=extent) + extent=extent, + fingerprint=fingerprint, + fingerprint_map=fingerprint_map) + # force the PG driver to call EndCopy() to detect errors and trigger a # rollback if needed dso.FlushCache() + fingerprint = fingerprint.digest() if lyrcache is not None: - updateLayerCache(layername=layername, - cache=lyrcache, - ds=dso, - savepoint=lyrTransaction if isinstance(lyrTransaction,str) else None, - last_updated=now) + if not updateLayerCache(layername=layername, + cache=lyrcache, + ds=dso, lyr=lyr, + force=force, + lyrTransaction=lyrTransaction, + last_updated=now, + fingerprint=fingerprint): + if isinstance(lyrTransaction, bool): + # the transaction on lyr was already rolled back + lyrTransaction = False except Exception: # pylint: disable=broad-exception-caught rv = False @@ -528,11 +542,13 @@ def _importSource(lyr : ogr.Layer, unar : dict[str,Any]|None = None, args : dict[str,Any] = {}, cachedir : Path|None = None, - extent : ogr.Geometry|None = None) -> None: + extent : ogr.Geometry|None = None, + fingerprint = None, fingerprint_map = None) -> None: """Import a source layer""" if unar is None: return _importSource2(lyr, path, args=args, - basedir=cachedir, extent=extent) + basedir=cachedir, extent=extent, + fingerprint=fingerprint, fingerprint_map=fingerprint_map) ds_srcpath = Path(args['path']) if ds_srcpath.is_absolute(): @@ -549,7 +565,8 @@ def _importSource(lyr : ogr.Layer, patterns=unar.get('patterns', None), exact_matches=[ds_srcpath]) return _importSource2(lyr, ds_srcpath, args=args, - basedir=Path(tmpdir), extent=extent) + basedir=Path(tmpdir), extent=extent, + fingerprint=fingerprint, fingerprint_map=fingerprint_map) def setFieldMapValue(fld : ogr.FieldDefn, idx : int, @@ -581,7 +598,8 @@ def setFieldMapValue(fld : ogr.FieldDefn, # pylint: disable-next=too-many-branches, too-many-locals, too-many-statements def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], - basedir : Path|None, extent : ogr.Geometry|None) -> None: + basedir : Path|None, extent : ogr.Geometry|None, + fingerprint, fingerprint_map) -> None: """Import a source layer (already extracted) This is more or less like ogr2ogr/GDALVectorTranslate() but we roll out our own (slower) version because GDALVectorTranslate() insists in @@ -638,7 +656,8 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], defn = lyr.GetLayerDefn() geomFieldCount = defn.GetGeomFieldCount() - if geomFieldCount != 1: # TODO Add support for multiple geometry fields + if geomFieldCount != 1: + # TODO Add support for multiple geometry fields (also in the fingerprinting logic below) logging.warning('Source layer "%s" has %d != 1 geometry fields', layername, geomFieldCount) fieldCount = defn.GetFieldCount() @@ -821,6 +840,8 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], f'to {ogr.GeometryTypeToName(eGType_dst)} not implemented') feature2.SetGeometryDirectly(geom) + fingerprintFeature(fingerprint, feature2, fingerprint_map) + if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE: raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}') @@ -847,10 +868,14 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], logging.info('Forced conversion to %s: %s', ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches)) -def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer, - last_updated : datetime, - savepoint : str|None = None) -> None: - """Update attributes in the layer cache for the given layer name.""" +def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, + layername : str, cache : ogr.Layer, + last_updated : datetime, fingerprint : bytes, + lyrTransaction : str|bool|None = None, + force : bool = False) -> bool: + """Update attributes in the layer cache for the given layer name. + Return a boolean indicating whether changes to layername were *not* + rolled back (hence might still be outstanding in the transaction).""" attributeFilter = 'layername = ' + escape_literal_str(layername) logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter) cache.SetAttributeFilter(attributeFilter) @@ -862,9 +887,11 @@ def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer, update = False feature = ogr.Feature(cache.GetLayerDefn()) feature.SetFieldString(0, layername) + fingerprint_old = None else: logging.debug('Updating existing feature in layer cache for %s', attributeFilter) update = True + fingerprint_old = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None assert cache.GetNextFeature() is None if not gdalVersionMin(maj=3, min=8): @@ -887,8 +914,44 @@ def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer, last_updated.minute, float(last_updated.second) + float(last_updated.microsecond)/1000000., tzFlag) + if fingerprint is None: + feature.SetFieldNull(2) + else: + # https://lists.osgeo.org/pipermail/gdal-dev/2020-December/053170.html + feature.SetFieldBinaryFromHexString(2, fingerprint.hex()) + ret = True if update: + if not (force or fingerprint is None or fingerprint_old is None or + fingerprint != fingerprint_old): + # no change: rollback (*before* updating the cache) if possible to retain FID values + if isinstance(lyrTransaction, str): + logging.info('Updated layer "%s" has identical fingerprint %s, rolling back', + layername, fingerprint.hex()[:8]) + query = 'ROLLBACK TO ' + lyrTransaction + logging.debug('%s', query) + try: + ds.ExecuteSQL(query) + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not execute SQL: %s', query) + else: + ret = False + elif isinstance(lyrTransaction, bool) and lyrTransaction: + logging.info('Updated layer "%s" has identical fingerprint %s, rolling back', + layername, fingerprint.hex()[:8]) + try: + if lyr.RollbackTransaction() == ogr.OGRERR_NONE: + ret = False + else: + logging.error('Could not rollback transaction on layer "%s"', + layername) + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not rollback transaction on layer "%s"', + layername) + else: + logging.info('Updated layer "%s" has new fingerprint %s', + layername, fingerprint.hex()[:8]) + # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead if cache.SetFeature(feature) != ogr.OGRERR_NONE: raise RuntimeError('Could not update feature in layer cache') @@ -899,3 +962,111 @@ def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer, # force the PG driver to call EndCopy() to detect errors and trigger a # rollback if needed ds.FlushCache() + return ret + +SERIALIZE_BOOL : Final = struct.Struct('@?').pack +SERIALIZE_SHORT : Final = struct.Struct('@h').pack +SERIALIZE_USHORT : Final = struct.Struct('@H').pack +SERIALIZE_LONG : Final = struct.Struct('@l').pack +SERIALIZE_ULONG : Final = struct.Struct('@L').pack +SERIALIZE_LONGLONG : Final = struct.Struct('@q').pack +SERIALIZE_FLOAT : Final = struct.Struct('@f').pack +SERIALIZE_DOUBLE : Final = struct.Struct('@d').pack +SERIALIZE_DATE : Final = struct.Struct('@HBB').pack +SERIALIZE_DATETIME : Final = struct.Struct('@HBBBBfB').pack +SERIALIZE_TIME : Final = struct.Struct('@BBfB').pack + +def _fingerprintBinary(m, v : bytes) -> None: + m.update(SERIALIZE_USHORT(len(v))) + m.update(v) +def _fingerprintDate(m, f : ogr.Feature, i : int) -> None: + year, month, day, *_ = f.GetFieldAsDateTime(i) + m.update(SERIALIZE_DATE(year, month, day)) +def _fingerprintDateTime(m, f : ogr.Feature, i : int) -> None: + year, month, day, hour, minute, second, tz = f.GetFieldAsDateTime(i) + m.update(SERIALIZE_DATETIME(year, month, day, hour, minute, second, tz)) +def _fingerprintTime(m, f : ogr.Feature, i : int) -> None: + _, _, _, hour, minute, second, tz = f.GetFieldAsDateTime(i) + m.update(SERIALIZE_TIME(hour, minute, second, tz)) +def _fingerprintUUID(m, f : ogr.Feature, i : int) -> None: + v = bytes.fromhex(f.GetFieldAsString(i).replace('-', '')) + #assert len(v) == 16 + m.update(v) + +def fingerprintLayerDefn(m, lyr : ogr.Layer): + """Fingerprint a layer definition (schema) and return the + fingerprinting recipe mapping for its features.""" + defn = lyr.GetLayerDefn() + + n = defn.GetFieldCount() + if n > 65535: + raise RuntimeError(f'Too many fields {n}') + m.update(SERIALIZE_USHORT(n)) + + ret = [None] * n + for i in range(n): + defn2 = defn.GetFieldDefn(i) + _fingerprintBinary(m, defn2.GetName().encode('utf-8')) + + fldType = defn2.GetType() + m.update(SERIALIZE_USHORT(fldType)) # OFTMaxType = 13 + + fldSubType = defn2.GetSubType() + m.update(SERIALIZE_USHORT(fldSubType)) # OFSTMaxSubType = 5 + + m.update(SERIALIZE_LONG(defn2.GetWidth())) + + if fldType == ogr.OFTInteger and fldSubType == ogr.OFSTNone: + ret[i] = lambda m,f,j: m.update(SERIALIZE_LONG(f.GetFieldAsInteger(j))) + elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTBoolean: + ret[i] = lambda m,f,j: m.update(SERIALIZE_BOOL(f.GetFieldAsInteger(j))) + elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTInt16: + ret[i] = lambda m,f,j: m.update(SERIALIZE_SHORT(f.GetFieldAsInteger(j))) + elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTNone: + ret[i] = lambda m,f,j: m.update(SERIALIZE_DOUBLE(f.GetFieldAsDouble(j))) + elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTFloat32: + ret[i] = lambda m,f,j: m.update(SERIALIZE_FLOAT(f.GetFieldAsDouble(j))) + elif fldType == ogr.OFTString and fldSubType == ogr.OFSTNone: + ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j)) + elif fldType == ogr.OFTString and fldSubType == ogr.OFSTUUID: + ret[i] = _fingerprintUUID + elif fldType == ogr.OFTBinary and fldSubType == ogr.OFSTNone: + ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j)) + elif fldType == ogr.OFTDate and fldSubType == ogr.OFSTNone: + ret[i] = _fingerprintDate + elif fldType == ogr.OFTTime and fldSubType == ogr.OFSTNone: + ret[i] = _fingerprintTime + elif fldType == ogr.OFTDateTime and fldSubType == ogr.OFSTNone: + ret[i] = _fingerprintDateTime + elif fldType == ogr.OFTInteger64 and fldSubType == ogr.OFSTNone: + ret[i] = lambda m,f,j: m.update(SERIALIZE_LONGLONG(f.GetFieldAsInteger64(j))) + + if ret[i] is None: + raise RuntimeError('Serialization not implemented for type ' + + ogr.GetFieldTypeName(fldType) + ', subtype ' + + ogr.GetFieldSubTypeName(fldSubType)) + + # defn.GetGeomFieldCount() != 1 is not supported and yields a warning in _importSource2() + m.update(SERIALIZE_LONG(lyr.GetGeomType())) + return tuple(ret) + +def fingerprintFeature(m, feature : ogr.Feature, x) -> None: # XXX type hint + """Fingerprint a feature using the given fingerprinting recipe.""" + # TODO bitmap for nulls + for i, f in enumerate(x): + if not feature.IsFieldSet(i): + #raise RuntimeError(f'Field #{i} + # "{feature.GetDefnRef().GetFieldDefn(i).GetName()}"' + # 'is not set.') + m.update(SERIALIZE_BOOL(False)) + elif feature.IsFieldNull(i): + # we could use a bitmap to serialize in ⸢n/8⸣ bytes instead of n bytes, but there + # are no builtin functions for that and since we're hashing anyway it's not worth + # the trouble + m.update(SERIALIZE_BOOL(False)) + else: + m.update(SERIALIZE_BOOL(True)) + f(m, feature, i) + wkb = feature.GetGeometryRef().ExportToIsoWkb() + m.update(SERIALIZE_ULONG(len(wkb))) + m.update(wkb) |