diff options
author | Guilhem Moulin <guilhem@fripost.org> | 2025-05-01 21:20:44 +0200 |
---|---|---|
committer | Guilhem Moulin <guilhem@fripost.org> | 2025-05-20 09:51:54 +0200 |
commit | 12bd18ed5e01a84b03be7c21570bac6547759970 (patch) | |
tree | ec491f29beca20bc4657f34ae7244b9f52321b0a | |
parent | 3edce255b3010244ab5d7fae59cbda11926f50f1 (diff) |
Move part of the fingerprinting logic into PostgreSQL when possible.
This allows ordering features before hashing, which is required for
layers from Naturvårdsverket and Skogsstyrelsen (features appears to be
randomly ordered in daily exports, so normalization and fingerprinting
is needed to detect whether there are now changes).
On the downside, this makes the cache a PostgreSQL-only feature. It's
also marginally slower than the old logic because for some reason
PostgreSQL doesn't seem to use the UNIQUE index and instead does a seq
scan followed by a quicksort.
Without fingerprinting logic:
$ time -f "%E (%U user, %S sys) %Mk maxres"
/usr/local/bin/webmap-import \
--cachedir=/var/cache/webmap \
--lockfile=/run/lock/webmap/lock \
--lockdir-sources=/run/lock/webmap/cache \
--force \
"sks:UtfordAvverk"
[…]
INFO: Layer "sks:UtfordAvverk" has 313044 features
[…]
3:54.45 (85.28 user, 26.19 sys) 72520k maxres
With old fingerprinting logic (full client-side SHA-256 digest of
features as they are being imported):
$ time -f "%E (%U user, %S sys) %Mk maxres"
/usr/local/bin/webmap-import \
--cachedir=/var/cache/webmap \
--lockfile=/run/lock/webmap/lock \
--lockdir-sources=/run/lock/webmap/cache \
--force \
"sks:UtfordAvverk"
[…]
INFO: Imported 313044 features from source layer "UtfordAvverkningYta"
[…]
INFO: Updated layer "sks:UtfordAvverk" has new fingerprint e655a97a
4:15.65 (108.46 user, 26.73 sys) 80672k maxres
With now fingerprinting logic (hybrid client/server SHA-256 digest and
hash_record_extended() calls after the import process):
$ time -f "%E (%U user, %S sys) %Mk maxres"
/usr/local/bin/webmap-import \
--cachedir=/var/cache/webmap \
--lockfile=/run/lock/webmap/lock \
--lockdir-sources=/run/lock/webmap/cache \
--force \
"sks:UtfordAvverk"
[…]
INFO: Layer "sks:UtfordAvverk" has 313044 features
[…]
4:30.77 (87.02 user, 25.67 sys) 72856k maxres
Same but without ORDER BY (or ORDER BY ogc_fid):
4:07.52 (88.23 user, 26.58 sys) 72060k maxres
(A server side incremental hash function would be better, but there is no
such thing currently and the only way to hash fully server side is to
aggregate rows in an array which would be too expensive memory-wise for
large table.)
-rw-r--r-- | import_source.py | 243 | ||||
-rwxr-xr-x | webmap-import | 4 |
2 files changed, 107 insertions, 140 deletions
diff --git a/import_source.py b/import_source.py index 647c79e..6fe4acf 100644 --- a/import_source.py +++ b/import_source.py @@ -26,9 +26,9 @@ import re from fnmatch import fnmatchcase from pathlib import Path from datetime import datetime, timedelta, UTC -from typing import Any, Final, Optional +from typing import Any, Final, Iterator, Optional import traceback -from enum import Enum, unique +from enum import Enum, unique as enum_unique from hashlib import sha256 import struct @@ -436,7 +436,7 @@ def listArchiveMembers(namelist : list[str], logging.debug('Ignoring archive member %s', name) return members -@unique +@enum_unique class ImportStatus(Enum): """Return value for importSources(): success, error, or no-change.""" IMPORT_SUCCESS = 0 @@ -472,8 +472,6 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, rv = ImportStatus.IMPORT_NOCHANGE now = datetime.now().astimezone() - fingerprint = sha256() - fingerprint_map = fingerprintLayerDefn(fingerprint, lyr) try: clearLayer(dso, lyr) # TODO conditional (only if not new)? @@ -481,24 +479,19 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, _importSource(lyr, **source['source'], args=source['import'], cachedir=cachedir, - extent=extent, - fingerprint=fingerprint, - fingerprint_map=fingerprint_map) + extent=extent) # force the PG driver to call EndCopy() to detect errors and trigger a # rollback if needed dso.FlushCache() - fingerprint = fingerprint.digest() if lyrcache is None: rv = ImportStatus.IMPORT_SUCCESS - elif updateLayerCache(layername=layername, - cache=lyrcache, + elif updateLayerCache(cache=lyrcache, ds=dso, lyr=lyr, force=force, lyrTransaction=lyrTransaction, - last_updated=now, - fingerprint=fingerprint): + last_updated=now): rv = ImportStatus.IMPORT_SUCCESS else: rv = ImportStatus.IMPORT_NOCHANGE @@ -553,13 +546,10 @@ def _importSource(lyr : ogr.Layer, unar : dict[str,Any]|None = None, args : dict[str,Any] = {}, cachedir : Path|None = None, - extent : ogr.Geometry|None = None, - fingerprint = None, fingerprint_map = None) -> None: + extent : ogr.Geometry|None = None) -> None: """Import a source layer""" if unar is None: - return _importSource2(lyr, path, args=args, - basedir=cachedir, extent=extent, - fingerprint=fingerprint, fingerprint_map=fingerprint_map) + return _importSource2(lyr, path, args=args, basedir=cachedir, extent=extent) ds_srcpath = Path(args['path']) if ds_srcpath.is_absolute(): @@ -575,9 +565,7 @@ def _importSource(lyr : ogr.Layer, fmt=unar.get('format', None), patterns=unar.get('patterns', None), exact_matches=[ds_srcpath]) - return _importSource2(lyr, ds_srcpath, args=args, - basedir=Path(tmpdir), extent=extent, - fingerprint=fingerprint, fingerprint_map=fingerprint_map) + return _importSource2(lyr, ds_srcpath, args=args, basedir=Path(tmpdir), extent=extent) def setFieldMapValue(fld : ogr.FieldDefn, idx : int, @@ -609,8 +597,7 @@ def setFieldMapValue(fld : ogr.FieldDefn, # pylint: disable-next=too-many-branches, too-many-locals, too-many-statements def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], - basedir : Path|None, extent : ogr.Geometry|None, - fingerprint, fingerprint_map) -> None: + basedir : Path|None, extent : ogr.Geometry|None) -> None: """Import a source layer (already extracted) This is more or less like ogr2ogr/GDALVectorTranslate() but we roll out our own (slower) version because GDALVectorTranslate() insists in @@ -851,8 +838,6 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], f'to {ogr.GeometryTypeToName(eGType_dst)} not implemented') feature2.SetGeometryDirectly(geom) - fingerprintFeature(fingerprint, feature2, fingerprint_map) - if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE: raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}') @@ -879,14 +864,94 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], logging.info('Forced conversion to %s: %s', ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches)) -def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, - layername : str, cache : ogr.Layer, - last_updated : datetime, fingerprint : bytes, +def listFieldsOrderBy(defn : ogr.FeatureDefn, + unique : bool|None = None, + nullable : bool|None = None) -> Iterator[str]: + """Return an iterator of column names suitable for ORDER BY.""" + fields_str = {} + for i in range(defn.GetFieldCount()): + fld = defn.GetFieldDefn(i) + if (fld.IsIgnored() or + # if 'unique' or 'unable' is not None then skip the field + # unless the boolean matches .IsUnique() resp. .IsNullable() + not (unique is None or fld.IsUnique() == unique) or + not (nullable is None or fld.IsNullable() == nullable)): + continue + if fld.GetType() in (ogr.OFTInteger, ogr.OFTInteger64): + # list integers first + yield fld.GetName() + elif fld.GetType() == ogr.OFTString: + w = fld.GetWidth() + if 0 < w < 256: + # only consider short-ish strings + fields_str[fld.GetName()] = w + # order string columns by width + for c,_ in sorted(fields_str.items(), key=lambda x:x[1]): + yield c + +# pylint: disable-next=too-many-branches, too-many-statements +def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, cache : ogr.Layer, + last_updated : datetime, lyrTransaction : str|bool|None = None, force : bool = False) -> bool: """Update attributes in the layer cache for the given layer name. Return a boolean indicating whether changes to layername were *not* rolled back (hence might still be outstanding in the transaction).""" + layername = lyr.GetName() + + dgst = sha256() + defn = lyr.GetLayerDefn() + fields = [] + for i in range(defn.GetFieldCount()): + fields.append('t.' + escape_identifier(defn.GetFieldDefn(i).GetName())) + if len(fields) == 0: + fields = ['0 AS hash_properties'] + else: + fields = [ 'hash_record_extended(ROW(' + ','.join(fields) + '),0) AS hash_properties' ] + + fidColumn = lyr.GetFIDColumn() + if fidColumn is None or fidColumn == '': + raise RuntimeError(f'Couldn\'t find FID column for "{layername}"') + # defn.GetGeomFieldCount() != 1 is not supported and yields a warning in _importSource2() + geometryColumn = lyr.GetGeometryColumn() + if geometryColumn is None or geometryColumn == '': + raise RuntimeError(f'Couldn\'t find geometry column for "{layername}"') + + fields.append('sha256(COALESCE(' + + 'ST_AsEWKB(t.' + escape_identifier(geometryColumn) + '),' + + '\'\')) AS hash_geom') + if len(fields) == 0: + raise RuntimeError('Empty field list in SELECT') + query = 'SELECT ' + ','.join(fields) + ' FROM ' + escape_identifier(layername) + ' t' + + sort_by = next(listFieldsOrderBy(defn, unique=True, nullable=False), None) + if sort_by is not None: + sort_by = [ sort_by ] + else: + count = lyr.GetFeatureCount(force=0) + if count is None or count < 0 or count > 5000: + logging.warning('Layer "%s" has many (%s) features but no UNIQUE NOT NULL constraint, ' + 'sorting might be unstable and slow', layername, + str(count) if (count is not None and count >= 0) else 'N/A') + sort_by = list(listFieldsOrderBy(defn)) + [ geometryColumn, fidColumn ] + query += ' ORDER BY ' + ','.join(['t.' + escape_identifier(c) for c in sort_by]) + + struct_dgst : Final = struct.Struct('@qq').pack + logging.debug('%s', query) + lyr2 = ds.ExecuteSQL(query) + try: + assert lyr2.GetLayerDefn().GetFieldDefn(0).GetName() == 'hash_properties' + assert lyr2.GetLayerDefn().GetFieldDefn(1).GetName() == 'hash_geom' + feature = lyr2.GetNextFeature() + while feature is not None: + dgst.update(struct_dgst(feature.GetFID(), feature.GetFieldAsInteger64(0))) + dgst.update(feature.GetFieldAsBinary(1)) + feature = lyr2.GetNextFeature() + finally: + ds.ReleaseResultSet(lyr2) + lyr2 = None + fingerprint = dgst.digest() + attributeFilter = 'layername = ' + escape_literal_str(layername) logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter) cache.SetAttributeFilter(attributeFilter) @@ -933,8 +998,13 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, ret = True if update: - if not (force or fingerprint is None or fingerprint_old is None or - fingerprint != fingerprint_old): + if (fingerprint is None or fingerprint_old is None or fingerprint != fingerprint_old): + logging.info('Updated layer "%s" has new fingerprint %s', layername, + fingerprint.hex()[:8] if fingerprint is not None else 'N/A') + elif force: + logging.info('Updated layer "%s" has identical fingerprint %s', + layername, fingerprint.hex()[:8]) + else: # no change: rollback (*before* updating the cache) if possible to retain FID values if isinstance(lyrTransaction, str): logging.info('Updated layer "%s" has identical fingerprint %s, rolling back', @@ -959,9 +1029,9 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, except Exception: # pylint: disable=broad-exception-caught logging.exception('Could not rollback transaction on layer "%s"', layername) - else: - logging.info('Updated layer "%s" has new fingerprint %s', - layername, fingerprint.hex()[:8]) + else: + logging.info('Updated layer "%s" has identical fingerprint %s', + layername, fingerprint.hex()[:8]) # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead if cache.SetFeature(feature) != ogr.OGRERR_NONE: @@ -974,110 +1044,3 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, # rollback if needed ds.FlushCache() return ret - -SERIALIZE_BOOL : Final = struct.Struct('@?').pack -SERIALIZE_SHORT : Final = struct.Struct('@h').pack -SERIALIZE_USHORT : Final = struct.Struct('@H').pack -SERIALIZE_LONG : Final = struct.Struct('@l').pack -SERIALIZE_ULONG : Final = struct.Struct('@L').pack -SERIALIZE_LONGLONG : Final = struct.Struct('@q').pack -SERIALIZE_FLOAT : Final = struct.Struct('@f').pack -SERIALIZE_DOUBLE : Final = struct.Struct('@d').pack -SERIALIZE_DATE : Final = struct.Struct('@HBB').pack -SERIALIZE_DATETIME : Final = struct.Struct('@HBBBBfB').pack -SERIALIZE_TIME : Final = struct.Struct('@BBfB').pack - -def _fingerprintBinary(m, v : bytes) -> None: - m.update(SERIALIZE_USHORT(len(v))) - m.update(v) -def _fingerprintDate(m, f : ogr.Feature, i : int) -> None: - year, month, day, *_ = f.GetFieldAsDateTime(i) - m.update(SERIALIZE_DATE(year, month, day)) -def _fingerprintDateTime(m, f : ogr.Feature, i : int) -> None: - year, month, day, hour, minute, second, tz = f.GetFieldAsDateTime(i) - m.update(SERIALIZE_DATETIME(year, month, day, hour, minute, second, tz)) -def _fingerprintTime(m, f : ogr.Feature, i : int) -> None: - _, _, _, hour, minute, second, tz = f.GetFieldAsDateTime(i) - m.update(SERIALIZE_TIME(hour, minute, second, tz)) -def _fingerprintUUID(m, f : ogr.Feature, i : int) -> None: - v = bytes.fromhex(f.GetFieldAsString(i).replace('-', '')) - #assert len(v) == 16 - m.update(v) - -def fingerprintLayerDefn(m, lyr : ogr.Layer): - """Fingerprint a layer definition (schema) and return the - fingerprinting recipe mapping for its features.""" - defn = lyr.GetLayerDefn() - - n = defn.GetFieldCount() - if n > 65535: - raise RuntimeError(f'Too many fields {n}') - m.update(SERIALIZE_USHORT(n)) - - ret = [None] * n - for i in range(n): - defn2 = defn.GetFieldDefn(i) - _fingerprintBinary(m, defn2.GetName().encode('utf-8')) - - fldType = defn2.GetType() - m.update(SERIALIZE_USHORT(fldType)) # OFTMaxType = 13 - - fldSubType = defn2.GetSubType() - m.update(SERIALIZE_USHORT(fldSubType)) # OFSTMaxSubType = 5 - - m.update(SERIALIZE_LONG(defn2.GetWidth())) - - if fldType == ogr.OFTInteger and fldSubType == ogr.OFSTNone: - ret[i] = lambda m,f,j: m.update(SERIALIZE_LONG(f.GetFieldAsInteger(j))) - elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTBoolean: - ret[i] = lambda m,f,j: m.update(SERIALIZE_BOOL(f.GetFieldAsInteger(j))) - elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTInt16: - ret[i] = lambda m,f,j: m.update(SERIALIZE_SHORT(f.GetFieldAsInteger(j))) - elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTNone: - ret[i] = lambda m,f,j: m.update(SERIALIZE_DOUBLE(f.GetFieldAsDouble(j))) - elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTFloat32: - ret[i] = lambda m,f,j: m.update(SERIALIZE_FLOAT(f.GetFieldAsDouble(j))) - elif fldType == ogr.OFTString and fldSubType == ogr.OFSTNone: - ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j)) - elif fldType == ogr.OFTString and fldSubType == ogr.OFSTUUID: - ret[i] = _fingerprintUUID - elif fldType == ogr.OFTBinary and fldSubType == ogr.OFSTNone: - ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j)) - elif fldType == ogr.OFTDate and fldSubType == ogr.OFSTNone: - ret[i] = _fingerprintDate - elif fldType == ogr.OFTTime and fldSubType == ogr.OFSTNone: - ret[i] = _fingerprintTime - elif fldType == ogr.OFTDateTime and fldSubType == ogr.OFSTNone: - ret[i] = _fingerprintDateTime - elif fldType == ogr.OFTInteger64 and fldSubType == ogr.OFSTNone: - ret[i] = lambda m,f,j: m.update(SERIALIZE_LONGLONG(f.GetFieldAsInteger64(j))) - - if ret[i] is None: - raise RuntimeError('Serialization not implemented for type ' - + ogr.GetFieldTypeName(fldType) + ', subtype ' - + ogr.GetFieldSubTypeName(fldSubType)) - - # defn.GetGeomFieldCount() != 1 is not supported and yields a warning in _importSource2() - m.update(SERIALIZE_LONG(lyr.GetGeomType())) - return tuple(ret) - -def fingerprintFeature(m, feature : ogr.Feature, x) -> None: # XXX type hint - """Fingerprint a feature using the given fingerprinting recipe.""" - # TODO bitmap for nulls - for i, f in enumerate(x): - if not feature.IsFieldSet(i): - #raise RuntimeError(f'Field #{i} - # "{feature.GetDefnRef().GetFieldDefn(i).GetName()}"' - # 'is not set.') - m.update(SERIALIZE_BOOL(False)) - elif feature.IsFieldNull(i): - # we could use a bitmap to serialize in ⸢n/8⸣ bytes instead of n bytes, but there - # are no builtin functions for that and since we're hashing anyway it's not worth - # the trouble - m.update(SERIALIZE_BOOL(False)) - else: - m.update(SERIALIZE_BOOL(True)) - f(m, feature, i) - wkb = feature.GetGeometryRef().ExportToIsoWkb() - m.update(SERIALIZE_ULONG(len(wkb))) - m.update(wkb) diff --git a/webmap-import b/webmap-import index 6f514a9..f20fdef 100755 --- a/webmap-import +++ b/webmap-import @@ -377,6 +377,10 @@ def validateLayerCacheField(defn : ogr.FeatureDefn, idx : int, def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool: """Validate layer cache table.""" + drvName = ds.GetDriver().ShortName + if drvName != 'PostgreSQL': # we need hash_record_extended(), sha256() and ST_AsEWKB() + logging.warning('Unsupported cache layer for output driver %s', drvName) + return False lyr = ds.GetLayerByName(name) if lyr is None: logging.warning('Table "%s" does not exist', name) |