aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-04-22 21:33:42 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-05-01 15:29:51 +0200
commitad38438a0b980ee816e1573bf18362e72345fa4d (patch)
tree8a276f36710dab903155bbdff05f7b516f5a5a00
parent12c3c9ddbf6b3aa36c70fa90477d6ae8e132a230 (diff)
webmap-import: Fingerprint destination layers to detect changes.
Comparing modification times is not enough since some sources (for instance Naturvårdsverket's SCI_Rikstackande) are updated on the server even though no objects are being added; the source layer remains unchanged but the file differs because of OBJECTID changes we are not interested in.
-rw-r--r--.pylintrc1
-rw-r--r--import_source.py205
-rw-r--r--schema.sql3
-rwxr-xr-xwebmap-import12
4 files changed, 200 insertions, 21 deletions
diff --git a/.pylintrc b/.pylintrc
index 54b0100..b87e779 100644
--- a/.pylintrc
+++ b/.pylintrc
@@ -5,3 +5,4 @@ max-locals = 50
max-branches = 25
max-statements = 100
max-nested-blocks = 10
+max-module-lines = 1250
diff --git a/import_source.py b/import_source.py
index 2d2f116..c4cb96f 100644
--- a/import_source.py
+++ b/import_source.py
@@ -26,8 +26,10 @@ import re
from fnmatch import fnmatchcase
from pathlib import Path
from datetime import datetime, timedelta, UTC
-from typing import Any, Optional
+from typing import Any, Final, Optional
import traceback
+from hashlib import sha256
+import struct
from osgeo import gdal, ogr, osr
from osgeo.gdalconst import (
@@ -439,7 +441,8 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
cachedir : Path|None = None,
extent : ogr.Geometry|None = None,
dsoTransaction : bool = False,
- lyrcache : ogr.Layer|None = None) -> bool:
+ lyrcache : ogr.Layer|None = None,
+ force : bool = False) -> bool:
"""Clear lyr and import source layers to it."""
layername = lyr.GetName()
@@ -461,6 +464,8 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
rv = True
now = datetime.now().astimezone()
+ fingerprint = sha256()
+ fingerprint_map = fingerprintLayerDefn(fingerprint, lyr)
try:
clearLayer(dso, lyr) # TODO conditional (only if not new)?
@@ -468,17 +473,26 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
_importSource(lyr, **source['source'],
args=source['import'],
cachedir=cachedir,
- extent=extent)
+ extent=extent,
+ fingerprint=fingerprint,
+ fingerprint_map=fingerprint_map)
+
# force the PG driver to call EndCopy() to detect errors and trigger a
# rollback if needed
dso.FlushCache()
+ fingerprint = fingerprint.digest()
if lyrcache is not None:
- updateLayerCache(layername=layername,
- cache=lyrcache,
- ds=dso,
- savepoint=lyrTransaction if isinstance(lyrTransaction,str) else None,
- last_updated=now)
+ if not updateLayerCache(layername=layername,
+ cache=lyrcache,
+ ds=dso, lyr=lyr,
+ force=force,
+ lyrTransaction=lyrTransaction,
+ last_updated=now,
+ fingerprint=fingerprint):
+ if isinstance(lyrTransaction, bool):
+ # the transaction on lyr was already rolled back
+ lyrTransaction = False
except Exception: # pylint: disable=broad-exception-caught
rv = False
@@ -528,11 +542,13 @@ def _importSource(lyr : ogr.Layer,
unar : dict[str,Any]|None = None,
args : dict[str,Any] = {},
cachedir : Path|None = None,
- extent : ogr.Geometry|None = None) -> None:
+ extent : ogr.Geometry|None = None,
+ fingerprint = None, fingerprint_map = None) -> None:
"""Import a source layer"""
if unar is None:
return _importSource2(lyr, path, args=args,
- basedir=cachedir, extent=extent)
+ basedir=cachedir, extent=extent,
+ fingerprint=fingerprint, fingerprint_map=fingerprint_map)
ds_srcpath = Path(args['path'])
if ds_srcpath.is_absolute():
@@ -549,7 +565,8 @@ def _importSource(lyr : ogr.Layer,
patterns=unar.get('patterns', None),
exact_matches=[ds_srcpath])
return _importSource2(lyr, ds_srcpath, args=args,
- basedir=Path(tmpdir), extent=extent)
+ basedir=Path(tmpdir), extent=extent,
+ fingerprint=fingerprint, fingerprint_map=fingerprint_map)
def setFieldMapValue(fld : ogr.FieldDefn,
idx : int,
@@ -581,7 +598,8 @@ def setFieldMapValue(fld : ogr.FieldDefn,
# pylint: disable-next=too-many-branches, too-many-locals, too-many-statements
def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
- basedir : Path|None, extent : ogr.Geometry|None) -> None:
+ basedir : Path|None, extent : ogr.Geometry|None,
+ fingerprint, fingerprint_map) -> None:
"""Import a source layer (already extracted)
This is more or less like ogr2ogr/GDALVectorTranslate() but we roll
out our own (slower) version because GDALVectorTranslate() insists in
@@ -638,7 +656,8 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
defn = lyr.GetLayerDefn()
geomFieldCount = defn.GetGeomFieldCount()
- if geomFieldCount != 1: # TODO Add support for multiple geometry fields
+ if geomFieldCount != 1:
+ # TODO Add support for multiple geometry fields (also in the fingerprinting logic below)
logging.warning('Source layer "%s" has %d != 1 geometry fields', layername, geomFieldCount)
fieldCount = defn.GetFieldCount()
@@ -821,6 +840,8 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
f'to {ogr.GeometryTypeToName(eGType_dst)} not implemented')
feature2.SetGeometryDirectly(geom)
+ fingerprintFeature(fingerprint, feature2, fingerprint_map)
+
if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE:
raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}')
@@ -847,10 +868,14 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
logging.info('Forced conversion to %s: %s',
ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches))
-def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer,
- last_updated : datetime,
- savepoint : str|None = None) -> None:
- """Update attributes in the layer cache for the given layer name."""
+def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer,
+ layername : str, cache : ogr.Layer,
+ last_updated : datetime, fingerprint : bytes,
+ lyrTransaction : str|bool|None = None,
+ force : bool = False) -> bool:
+ """Update attributes in the layer cache for the given layer name.
+ Return a boolean indicating whether changes to layername were *not*
+ rolled back (hence might still be outstanding in the transaction)."""
attributeFilter = 'layername = ' + escape_literal_str(layername)
logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
cache.SetAttributeFilter(attributeFilter)
@@ -862,9 +887,11 @@ def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer,
update = False
feature = ogr.Feature(cache.GetLayerDefn())
feature.SetFieldString(0, layername)
+ fingerprint_old = None
else:
logging.debug('Updating existing feature in layer cache for %s', attributeFilter)
update = True
+ fingerprint_old = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None
assert cache.GetNextFeature() is None
if not gdalVersionMin(maj=3, min=8):
@@ -887,8 +914,44 @@ def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer,
last_updated.minute,
float(last_updated.second) + float(last_updated.microsecond)/1000000.,
tzFlag)
+ if fingerprint is None:
+ feature.SetFieldNull(2)
+ else:
+ # https://lists.osgeo.org/pipermail/gdal-dev/2020-December/053170.html
+ feature.SetFieldBinaryFromHexString(2, fingerprint.hex())
+ ret = True
if update:
+ if not (force or fingerprint is None or fingerprint_old is None or
+ fingerprint != fingerprint_old):
+ # no change: rollback (*before* updating the cache) if possible to retain FID values
+ if isinstance(lyrTransaction, str):
+ logging.info('Updated layer "%s" has identical fingerprint %s, rolling back',
+ layername, fingerprint.hex()[:8])
+ query = 'ROLLBACK TO ' + lyrTransaction
+ logging.debug('%s', query)
+ try:
+ ds.ExecuteSQL(query)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not execute SQL: %s', query)
+ else:
+ ret = False
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
+ logging.info('Updated layer "%s" has identical fingerprint %s, rolling back',
+ layername, fingerprint.hex()[:8])
+ try:
+ if lyr.RollbackTransaction() == ogr.OGRERR_NONE:
+ ret = False
+ else:
+ logging.error('Could not rollback transaction on layer "%s"',
+ layername)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not rollback transaction on layer "%s"',
+ layername)
+ else:
+ logging.info('Updated layer "%s" has new fingerprint %s',
+ layername, fingerprint.hex()[:8])
+
# TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead
if cache.SetFeature(feature) != ogr.OGRERR_NONE:
raise RuntimeError('Could not update feature in layer cache')
@@ -899,3 +962,111 @@ def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer,
# force the PG driver to call EndCopy() to detect errors and trigger a
# rollback if needed
ds.FlushCache()
+ return ret
+
+SERIALIZE_BOOL : Final = struct.Struct('@?').pack
+SERIALIZE_SHORT : Final = struct.Struct('@h').pack
+SERIALIZE_USHORT : Final = struct.Struct('@H').pack
+SERIALIZE_LONG : Final = struct.Struct('@l').pack
+SERIALIZE_ULONG : Final = struct.Struct('@L').pack
+SERIALIZE_LONGLONG : Final = struct.Struct('@q').pack
+SERIALIZE_FLOAT : Final = struct.Struct('@f').pack
+SERIALIZE_DOUBLE : Final = struct.Struct('@d').pack
+SERIALIZE_DATE : Final = struct.Struct('@HBB').pack
+SERIALIZE_DATETIME : Final = struct.Struct('@HBBBBfB').pack
+SERIALIZE_TIME : Final = struct.Struct('@BBfB').pack
+
+def _fingerprintBinary(m, v : bytes) -> None:
+ m.update(SERIALIZE_USHORT(len(v)))
+ m.update(v)
+def _fingerprintDate(m, f : ogr.Feature, i : int) -> None:
+ year, month, day, *_ = f.GetFieldAsDateTime(i)
+ m.update(SERIALIZE_DATE(year, month, day))
+def _fingerprintDateTime(m, f : ogr.Feature, i : int) -> None:
+ year, month, day, hour, minute, second, tz = f.GetFieldAsDateTime(i)
+ m.update(SERIALIZE_DATETIME(year, month, day, hour, minute, second, tz))
+def _fingerprintTime(m, f : ogr.Feature, i : int) -> None:
+ _, _, _, hour, minute, second, tz = f.GetFieldAsDateTime(i)
+ m.update(SERIALIZE_TIME(hour, minute, second, tz))
+def _fingerprintUUID(m, f : ogr.Feature, i : int) -> None:
+ v = bytes.fromhex(f.GetFieldAsString(i).replace('-', ''))
+ #assert len(v) == 16
+ m.update(v)
+
+def fingerprintLayerDefn(m, lyr : ogr.Layer):
+ """Fingerprint a layer definition (schema) and return the
+ fingerprinting recipe mapping for its features."""
+ defn = lyr.GetLayerDefn()
+
+ n = defn.GetFieldCount()
+ if n > 65535:
+ raise RuntimeError(f'Too many fields {n}')
+ m.update(SERIALIZE_USHORT(n))
+
+ ret = [None] * n
+ for i in range(n):
+ defn2 = defn.GetFieldDefn(i)
+ _fingerprintBinary(m, defn2.GetName().encode('utf-8'))
+
+ fldType = defn2.GetType()
+ m.update(SERIALIZE_USHORT(fldType)) # OFTMaxType = 13
+
+ fldSubType = defn2.GetSubType()
+ m.update(SERIALIZE_USHORT(fldSubType)) # OFSTMaxSubType = 5
+
+ m.update(SERIALIZE_LONG(defn2.GetWidth()))
+
+ if fldType == ogr.OFTInteger and fldSubType == ogr.OFSTNone:
+ ret[i] = lambda m,f,j: m.update(SERIALIZE_LONG(f.GetFieldAsInteger(j)))
+ elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTBoolean:
+ ret[i] = lambda m,f,j: m.update(SERIALIZE_BOOL(f.GetFieldAsInteger(j)))
+ elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTInt16:
+ ret[i] = lambda m,f,j: m.update(SERIALIZE_SHORT(f.GetFieldAsInteger(j)))
+ elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTNone:
+ ret[i] = lambda m,f,j: m.update(SERIALIZE_DOUBLE(f.GetFieldAsDouble(j)))
+ elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTFloat32:
+ ret[i] = lambda m,f,j: m.update(SERIALIZE_FLOAT(f.GetFieldAsDouble(j)))
+ elif fldType == ogr.OFTString and fldSubType == ogr.OFSTNone:
+ ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j))
+ elif fldType == ogr.OFTString and fldSubType == ogr.OFSTUUID:
+ ret[i] = _fingerprintUUID
+ elif fldType == ogr.OFTBinary and fldSubType == ogr.OFSTNone:
+ ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j))
+ elif fldType == ogr.OFTDate and fldSubType == ogr.OFSTNone:
+ ret[i] = _fingerprintDate
+ elif fldType == ogr.OFTTime and fldSubType == ogr.OFSTNone:
+ ret[i] = _fingerprintTime
+ elif fldType == ogr.OFTDateTime and fldSubType == ogr.OFSTNone:
+ ret[i] = _fingerprintDateTime
+ elif fldType == ogr.OFTInteger64 and fldSubType == ogr.OFSTNone:
+ ret[i] = lambda m,f,j: m.update(SERIALIZE_LONGLONG(f.GetFieldAsInteger64(j)))
+
+ if ret[i] is None:
+ raise RuntimeError('Serialization not implemented for type '
+ + ogr.GetFieldTypeName(fldType) + ', subtype '
+ + ogr.GetFieldSubTypeName(fldSubType))
+
+ # defn.GetGeomFieldCount() != 1 is not supported and yields a warning in _importSource2()
+ m.update(SERIALIZE_LONG(lyr.GetGeomType()))
+ return tuple(ret)
+
+def fingerprintFeature(m, feature : ogr.Feature, x) -> None: # XXX type hint
+ """Fingerprint a feature using the given fingerprinting recipe."""
+ # TODO bitmap for nulls
+ for i, f in enumerate(x):
+ if not feature.IsFieldSet(i):
+ #raise RuntimeError(f'Field #{i}
+ # "{feature.GetDefnRef().GetFieldDefn(i).GetName()}"'
+ # 'is not set.')
+ m.update(SERIALIZE_BOOL(False))
+ elif feature.IsFieldNull(i):
+ # we could use a bitmap to serialize in ⸢n/8⸣ bytes instead of n bytes, but there
+ # are no builtin functions for that and since we're hashing anyway it's not worth
+ # the trouble
+ m.update(SERIALIZE_BOOL(False))
+ else:
+ m.update(SERIALIZE_BOOL(True))
+ f(m, feature, i)
+ wkb = feature.GetGeometryRef().ExportToIsoWkb()
+ m.update(SERIALIZE_ULONG(len(wkb)))
+ m.update(wkb)
diff --git a/schema.sql b/schema.sql
index 0afcee2..c9079f9 100644
--- a/schema.sql
+++ b/schema.sql
@@ -3012,7 +3012,8 @@ ALTER SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" OWNED BY postgis."vbk:vin
CREATE TABLE public.layercache (
ogc_fid bigint NOT NULL,
layername character varying(255) NOT NULL,
- last_updated timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL
+ last_updated timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL,
+ fingerprint bytea NOT NULL
);
diff --git a/webmap-import b/webmap-import
index c930730..c801b6d 100755
--- a/webmap-import
+++ b/webmap-import
@@ -289,7 +289,8 @@ def processOutputLayer(ds : gdal.Dataset,
return importSources(dso=ds, lyr=lyr, sources=sources,
cachedir=cachedir, extent=extent,
dsoTransaction=dsTransaction,
- lyrcache=lyrcache)
+ lyrcache=lyrcache,
+ force=force)
def validate_sources(layers : dict[str, Any]) -> None:
"""Mangle and validate layer sources and import definitions"""
@@ -394,6 +395,8 @@ def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool:
'nullable': False, 'unique': True, 'width': 255 },
{ 'name': 'last_updated', 'typ': ogr.OFTDateTime,
'nullable': False },
+ { 'name': 'fingerprint', 'typ': ogr.OFTBinary,
+ 'nullable': False, 'width': 32 },
]
m = len(fields)
n = defn.GetFieldCount()
@@ -495,8 +498,11 @@ def areSourceFilesNewer(layername : str,
microsecond=round(ms*1000000),
tzinfo=tz
)
- logging.debug('Found entry in layer cache for "%s", last_updated=%s', layername,
- dt.isoformat(timespec='microseconds'))
+ fpr = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None
+ logging.debug('Found entry in layer cache for "%s", last_updated=%s, fingerprint=%s',
+ layername,
+ dt.isoformat(timespec='microseconds'),
+ fpr.hex() if fpr is not None else 'NULL')
ret = int(dt.timestamp() * 1000000.) * 1000 < t
if lyrcache.GetNextFeature() is not None: