diff options
author | Guilhem Moulin <guilhem@fripost.org> | 2025-04-22 21:33:42 +0200 |
---|---|---|
committer | Guilhem Moulin <guilhem@fripost.org> | 2025-05-01 15:29:51 +0200 |
commit | ad38438a0b980ee816e1573bf18362e72345fa4d (patch) | |
tree | 8a276f36710dab903155bbdff05f7b516f5a5a00 | |
parent | 12c3c9ddbf6b3aa36c70fa90477d6ae8e132a230 (diff) |
webmap-import: Fingerprint destination layers to detect changes.
Comparing modification times is not enough since some sources (for
instance Naturvårdsverket's SCI_Rikstackande) are updated on the server
even though no objects are being added; the source layer remains
unchanged but the file differs because of OBJECTID changes we are not
interested in.
-rw-r--r-- | .pylintrc | 1 | ||||
-rw-r--r-- | import_source.py | 205 | ||||
-rw-r--r-- | schema.sql | 3 | ||||
-rwxr-xr-x | webmap-import | 12 |
4 files changed, 200 insertions, 21 deletions
@@ -5,3 +5,4 @@ max-locals = 50 max-branches = 25 max-statements = 100 max-nested-blocks = 10 +max-module-lines = 1250 diff --git a/import_source.py b/import_source.py index 2d2f116..c4cb96f 100644 --- a/import_source.py +++ b/import_source.py @@ -26,8 +26,10 @@ import re from fnmatch import fnmatchcase from pathlib import Path from datetime import datetime, timedelta, UTC -from typing import Any, Optional +from typing import Any, Final, Optional import traceback +from hashlib import sha256 +import struct from osgeo import gdal, ogr, osr from osgeo.gdalconst import ( @@ -439,7 +441,8 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, cachedir : Path|None = None, extent : ogr.Geometry|None = None, dsoTransaction : bool = False, - lyrcache : ogr.Layer|None = None) -> bool: + lyrcache : ogr.Layer|None = None, + force : bool = False) -> bool: """Clear lyr and import source layers to it.""" layername = lyr.GetName() @@ -461,6 +464,8 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, rv = True now = datetime.now().astimezone() + fingerprint = sha256() + fingerprint_map = fingerprintLayerDefn(fingerprint, lyr) try: clearLayer(dso, lyr) # TODO conditional (only if not new)? @@ -468,17 +473,26 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, _importSource(lyr, **source['source'], args=source['import'], cachedir=cachedir, - extent=extent) + extent=extent, + fingerprint=fingerprint, + fingerprint_map=fingerprint_map) + # force the PG driver to call EndCopy() to detect errors and trigger a # rollback if needed dso.FlushCache() + fingerprint = fingerprint.digest() if lyrcache is not None: - updateLayerCache(layername=layername, - cache=lyrcache, - ds=dso, - savepoint=lyrTransaction if isinstance(lyrTransaction,str) else None, - last_updated=now) + if not updateLayerCache(layername=layername, + cache=lyrcache, + ds=dso, lyr=lyr, + force=force, + lyrTransaction=lyrTransaction, + last_updated=now, + fingerprint=fingerprint): + if isinstance(lyrTransaction, bool): + # the transaction on lyr was already rolled back + lyrTransaction = False except Exception: # pylint: disable=broad-exception-caught rv = False @@ -528,11 +542,13 @@ def _importSource(lyr : ogr.Layer, unar : dict[str,Any]|None = None, args : dict[str,Any] = {}, cachedir : Path|None = None, - extent : ogr.Geometry|None = None) -> None: + extent : ogr.Geometry|None = None, + fingerprint = None, fingerprint_map = None) -> None: """Import a source layer""" if unar is None: return _importSource2(lyr, path, args=args, - basedir=cachedir, extent=extent) + basedir=cachedir, extent=extent, + fingerprint=fingerprint, fingerprint_map=fingerprint_map) ds_srcpath = Path(args['path']) if ds_srcpath.is_absolute(): @@ -549,7 +565,8 @@ def _importSource(lyr : ogr.Layer, patterns=unar.get('patterns', None), exact_matches=[ds_srcpath]) return _importSource2(lyr, ds_srcpath, args=args, - basedir=Path(tmpdir), extent=extent) + basedir=Path(tmpdir), extent=extent, + fingerprint=fingerprint, fingerprint_map=fingerprint_map) def setFieldMapValue(fld : ogr.FieldDefn, idx : int, @@ -581,7 +598,8 @@ def setFieldMapValue(fld : ogr.FieldDefn, # pylint: disable-next=too-many-branches, too-many-locals, too-many-statements def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], - basedir : Path|None, extent : ogr.Geometry|None) -> None: + basedir : Path|None, extent : ogr.Geometry|None, + fingerprint, fingerprint_map) -> None: """Import a source layer (already extracted) This is more or less like ogr2ogr/GDALVectorTranslate() but we roll out our own (slower) version because GDALVectorTranslate() insists in @@ -638,7 +656,8 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], defn = lyr.GetLayerDefn() geomFieldCount = defn.GetGeomFieldCount() - if geomFieldCount != 1: # TODO Add support for multiple geometry fields + if geomFieldCount != 1: + # TODO Add support for multiple geometry fields (also in the fingerprinting logic below) logging.warning('Source layer "%s" has %d != 1 geometry fields', layername, geomFieldCount) fieldCount = defn.GetFieldCount() @@ -821,6 +840,8 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], f'to {ogr.GeometryTypeToName(eGType_dst)} not implemented') feature2.SetGeometryDirectly(geom) + fingerprintFeature(fingerprint, feature2, fingerprint_map) + if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE: raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}') @@ -847,10 +868,14 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], logging.info('Forced conversion to %s: %s', ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches)) -def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer, - last_updated : datetime, - savepoint : str|None = None) -> None: - """Update attributes in the layer cache for the given layer name.""" +def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, + layername : str, cache : ogr.Layer, + last_updated : datetime, fingerprint : bytes, + lyrTransaction : str|bool|None = None, + force : bool = False) -> bool: + """Update attributes in the layer cache for the given layer name. + Return a boolean indicating whether changes to layername were *not* + rolled back (hence might still be outstanding in the transaction).""" attributeFilter = 'layername = ' + escape_literal_str(layername) logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter) cache.SetAttributeFilter(attributeFilter) @@ -862,9 +887,11 @@ def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer, update = False feature = ogr.Feature(cache.GetLayerDefn()) feature.SetFieldString(0, layername) + fingerprint_old = None else: logging.debug('Updating existing feature in layer cache for %s', attributeFilter) update = True + fingerprint_old = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None assert cache.GetNextFeature() is None if not gdalVersionMin(maj=3, min=8): @@ -887,8 +914,44 @@ def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer, last_updated.minute, float(last_updated.second) + float(last_updated.microsecond)/1000000., tzFlag) + if fingerprint is None: + feature.SetFieldNull(2) + else: + # https://lists.osgeo.org/pipermail/gdal-dev/2020-December/053170.html + feature.SetFieldBinaryFromHexString(2, fingerprint.hex()) + ret = True if update: + if not (force or fingerprint is None or fingerprint_old is None or + fingerprint != fingerprint_old): + # no change: rollback (*before* updating the cache) if possible to retain FID values + if isinstance(lyrTransaction, str): + logging.info('Updated layer "%s" has identical fingerprint %s, rolling back', + layername, fingerprint.hex()[:8]) + query = 'ROLLBACK TO ' + lyrTransaction + logging.debug('%s', query) + try: + ds.ExecuteSQL(query) + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not execute SQL: %s', query) + else: + ret = False + elif isinstance(lyrTransaction, bool) and lyrTransaction: + logging.info('Updated layer "%s" has identical fingerprint %s, rolling back', + layername, fingerprint.hex()[:8]) + try: + if lyr.RollbackTransaction() == ogr.OGRERR_NONE: + ret = False + else: + logging.error('Could not rollback transaction on layer "%s"', + layername) + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not rollback transaction on layer "%s"', + layername) + else: + logging.info('Updated layer "%s" has new fingerprint %s', + layername, fingerprint.hex()[:8]) + # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead if cache.SetFeature(feature) != ogr.OGRERR_NONE: raise RuntimeError('Could not update feature in layer cache') @@ -899,3 +962,111 @@ def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer, # force the PG driver to call EndCopy() to detect errors and trigger a # rollback if needed ds.FlushCache() + return ret + +SERIALIZE_BOOL : Final = struct.Struct('@?').pack +SERIALIZE_SHORT : Final = struct.Struct('@h').pack +SERIALIZE_USHORT : Final = struct.Struct('@H').pack +SERIALIZE_LONG : Final = struct.Struct('@l').pack +SERIALIZE_ULONG : Final = struct.Struct('@L').pack +SERIALIZE_LONGLONG : Final = struct.Struct('@q').pack +SERIALIZE_FLOAT : Final = struct.Struct('@f').pack +SERIALIZE_DOUBLE : Final = struct.Struct('@d').pack +SERIALIZE_DATE : Final = struct.Struct('@HBB').pack +SERIALIZE_DATETIME : Final = struct.Struct('@HBBBBfB').pack +SERIALIZE_TIME : Final = struct.Struct('@BBfB').pack + +def _fingerprintBinary(m, v : bytes) -> None: + m.update(SERIALIZE_USHORT(len(v))) + m.update(v) +def _fingerprintDate(m, f : ogr.Feature, i : int) -> None: + year, month, day, *_ = f.GetFieldAsDateTime(i) + m.update(SERIALIZE_DATE(year, month, day)) +def _fingerprintDateTime(m, f : ogr.Feature, i : int) -> None: + year, month, day, hour, minute, second, tz = f.GetFieldAsDateTime(i) + m.update(SERIALIZE_DATETIME(year, month, day, hour, minute, second, tz)) +def _fingerprintTime(m, f : ogr.Feature, i : int) -> None: + _, _, _, hour, minute, second, tz = f.GetFieldAsDateTime(i) + m.update(SERIALIZE_TIME(hour, minute, second, tz)) +def _fingerprintUUID(m, f : ogr.Feature, i : int) -> None: + v = bytes.fromhex(f.GetFieldAsString(i).replace('-', '')) + #assert len(v) == 16 + m.update(v) + +def fingerprintLayerDefn(m, lyr : ogr.Layer): + """Fingerprint a layer definition (schema) and return the + fingerprinting recipe mapping for its features.""" + defn = lyr.GetLayerDefn() + + n = defn.GetFieldCount() + if n > 65535: + raise RuntimeError(f'Too many fields {n}') + m.update(SERIALIZE_USHORT(n)) + + ret = [None] * n + for i in range(n): + defn2 = defn.GetFieldDefn(i) + _fingerprintBinary(m, defn2.GetName().encode('utf-8')) + + fldType = defn2.GetType() + m.update(SERIALIZE_USHORT(fldType)) # OFTMaxType = 13 + + fldSubType = defn2.GetSubType() + m.update(SERIALIZE_USHORT(fldSubType)) # OFSTMaxSubType = 5 + + m.update(SERIALIZE_LONG(defn2.GetWidth())) + + if fldType == ogr.OFTInteger and fldSubType == ogr.OFSTNone: + ret[i] = lambda m,f,j: m.update(SERIALIZE_LONG(f.GetFieldAsInteger(j))) + elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTBoolean: + ret[i] = lambda m,f,j: m.update(SERIALIZE_BOOL(f.GetFieldAsInteger(j))) + elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTInt16: + ret[i] = lambda m,f,j: m.update(SERIALIZE_SHORT(f.GetFieldAsInteger(j))) + elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTNone: + ret[i] = lambda m,f,j: m.update(SERIALIZE_DOUBLE(f.GetFieldAsDouble(j))) + elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTFloat32: + ret[i] = lambda m,f,j: m.update(SERIALIZE_FLOAT(f.GetFieldAsDouble(j))) + elif fldType == ogr.OFTString and fldSubType == ogr.OFSTNone: + ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j)) + elif fldType == ogr.OFTString and fldSubType == ogr.OFSTUUID: + ret[i] = _fingerprintUUID + elif fldType == ogr.OFTBinary and fldSubType == ogr.OFSTNone: + ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j)) + elif fldType == ogr.OFTDate and fldSubType == ogr.OFSTNone: + ret[i] = _fingerprintDate + elif fldType == ogr.OFTTime and fldSubType == ogr.OFSTNone: + ret[i] = _fingerprintTime + elif fldType == ogr.OFTDateTime and fldSubType == ogr.OFSTNone: + ret[i] = _fingerprintDateTime + elif fldType == ogr.OFTInteger64 and fldSubType == ogr.OFSTNone: + ret[i] = lambda m,f,j: m.update(SERIALIZE_LONGLONG(f.GetFieldAsInteger64(j))) + + if ret[i] is None: + raise RuntimeError('Serialization not implemented for type ' + + ogr.GetFieldTypeName(fldType) + ', subtype ' + + ogr.GetFieldSubTypeName(fldSubType)) + + # defn.GetGeomFieldCount() != 1 is not supported and yields a warning in _importSource2() + m.update(SERIALIZE_LONG(lyr.GetGeomType())) + return tuple(ret) + +def fingerprintFeature(m, feature : ogr.Feature, x) -> None: # XXX type hint + """Fingerprint a feature using the given fingerprinting recipe.""" + # TODO bitmap for nulls + for i, f in enumerate(x): + if not feature.IsFieldSet(i): + #raise RuntimeError(f'Field #{i} + # "{feature.GetDefnRef().GetFieldDefn(i).GetName()}"' + # 'is not set.') + m.update(SERIALIZE_BOOL(False)) + elif feature.IsFieldNull(i): + # we could use a bitmap to serialize in ⸢n/8⸣ bytes instead of n bytes, but there + # are no builtin functions for that and since we're hashing anyway it's not worth + # the trouble + m.update(SERIALIZE_BOOL(False)) + else: + m.update(SERIALIZE_BOOL(True)) + f(m, feature, i) + wkb = feature.GetGeometryRef().ExportToIsoWkb() + m.update(SERIALIZE_ULONG(len(wkb))) + m.update(wkb) @@ -3012,7 +3012,8 @@ ALTER SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" OWNED BY postgis."vbk:vin CREATE TABLE public.layercache ( ogc_fid bigint NOT NULL, layername character varying(255) NOT NULL, - last_updated timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL + last_updated timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL, + fingerprint bytea NOT NULL ); diff --git a/webmap-import b/webmap-import index c930730..c801b6d 100755 --- a/webmap-import +++ b/webmap-import @@ -289,7 +289,8 @@ def processOutputLayer(ds : gdal.Dataset, return importSources(dso=ds, lyr=lyr, sources=sources, cachedir=cachedir, extent=extent, dsoTransaction=dsTransaction, - lyrcache=lyrcache) + lyrcache=lyrcache, + force=force) def validate_sources(layers : dict[str, Any]) -> None: """Mangle and validate layer sources and import definitions""" @@ -394,6 +395,8 @@ def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool: 'nullable': False, 'unique': True, 'width': 255 }, { 'name': 'last_updated', 'typ': ogr.OFTDateTime, 'nullable': False }, + { 'name': 'fingerprint', 'typ': ogr.OFTBinary, + 'nullable': False, 'width': 32 }, ] m = len(fields) n = defn.GetFieldCount() @@ -495,8 +498,11 @@ def areSourceFilesNewer(layername : str, microsecond=round(ms*1000000), tzinfo=tz ) - logging.debug('Found entry in layer cache for "%s", last_updated=%s', layername, - dt.isoformat(timespec='microseconds')) + fpr = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None + logging.debug('Found entry in layer cache for "%s", last_updated=%s, fingerprint=%s', + layername, + dt.isoformat(timespec='microseconds'), + fpr.hex() if fpr is not None else 'NULL') ret = int(dt.timestamp() * 1000000.) * 1000 < t if lyrcache.GetNextFeature() is not None: |