aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-05-01 21:20:44 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-05-20 09:51:54 +0200
commit12bd18ed5e01a84b03be7c21570bac6547759970 (patch)
treeec491f29beca20bc4657f34ae7244b9f52321b0a
parent3edce255b3010244ab5d7fae59cbda11926f50f1 (diff)
Move part of the fingerprinting logic into PostgreSQL when possible.
This allows ordering features before hashing, which is required for layers from Naturvårdsverket and Skogsstyrelsen (features appears to be randomly ordered in daily exports, so normalization and fingerprinting is needed to detect whether there are now changes). On the downside, this makes the cache a PostgreSQL-only feature. It's also marginally slower than the old logic because for some reason PostgreSQL doesn't seem to use the UNIQUE index and instead does a seq scan followed by a quicksort. Without fingerprinting logic: $ time -f "%E (%U user, %S sys) %Mk maxres" /usr/local/bin/webmap-import \ --cachedir=/var/cache/webmap \ --lockfile=/run/lock/webmap/lock \ --lockdir-sources=/run/lock/webmap/cache \ --force \ "sks:UtfordAvverk" […] INFO: Layer "sks:UtfordAvverk" has 313044 features […] 3:54.45 (85.28 user, 26.19 sys) 72520k maxres With old fingerprinting logic (full client-side SHA-256 digest of features as they are being imported): $ time -f "%E (%U user, %S sys) %Mk maxres" /usr/local/bin/webmap-import \ --cachedir=/var/cache/webmap \ --lockfile=/run/lock/webmap/lock \ --lockdir-sources=/run/lock/webmap/cache \ --force \ "sks:UtfordAvverk" […] INFO: Imported 313044 features from source layer "UtfordAvverkningYta" […] INFO: Updated layer "sks:UtfordAvverk" has new fingerprint e655a97a 4:15.65 (108.46 user, 26.73 sys) 80672k maxres With now fingerprinting logic (hybrid client/server SHA-256 digest and hash_record_extended() calls after the import process): $ time -f "%E (%U user, %S sys) %Mk maxres" /usr/local/bin/webmap-import \ --cachedir=/var/cache/webmap \ --lockfile=/run/lock/webmap/lock \ --lockdir-sources=/run/lock/webmap/cache \ --force \ "sks:UtfordAvverk" […] INFO: Layer "sks:UtfordAvverk" has 313044 features […] 4:30.77 (87.02 user, 25.67 sys) 72856k maxres Same but without ORDER BY (or ORDER BY ogc_fid): 4:07.52 (88.23 user, 26.58 sys) 72060k maxres (A server side incremental hash function would be better, but there is no such thing currently and the only way to hash fully server side is to aggregate rows in an array which would be too expensive memory-wise for large table.)
-rw-r--r--import_source.py243
-rwxr-xr-xwebmap-import4
2 files changed, 107 insertions, 140 deletions
diff --git a/import_source.py b/import_source.py
index 647c79e..6fe4acf 100644
--- a/import_source.py
+++ b/import_source.py
@@ -26,9 +26,9 @@ import re
from fnmatch import fnmatchcase
from pathlib import Path
from datetime import datetime, timedelta, UTC
-from typing import Any, Final, Optional
+from typing import Any, Final, Iterator, Optional
import traceback
-from enum import Enum, unique
+from enum import Enum, unique as enum_unique
from hashlib import sha256
import struct
@@ -436,7 +436,7 @@ def listArchiveMembers(namelist : list[str],
logging.debug('Ignoring archive member %s', name)
return members
-@unique
+@enum_unique
class ImportStatus(Enum):
"""Return value for importSources(): success, error, or no-change."""
IMPORT_SUCCESS = 0
@@ -472,8 +472,6 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
rv = ImportStatus.IMPORT_NOCHANGE
now = datetime.now().astimezone()
- fingerprint = sha256()
- fingerprint_map = fingerprintLayerDefn(fingerprint, lyr)
try:
clearLayer(dso, lyr) # TODO conditional (only if not new)?
@@ -481,24 +479,19 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
_importSource(lyr, **source['source'],
args=source['import'],
cachedir=cachedir,
- extent=extent,
- fingerprint=fingerprint,
- fingerprint_map=fingerprint_map)
+ extent=extent)
# force the PG driver to call EndCopy() to detect errors and trigger a
# rollback if needed
dso.FlushCache()
- fingerprint = fingerprint.digest()
if lyrcache is None:
rv = ImportStatus.IMPORT_SUCCESS
- elif updateLayerCache(layername=layername,
- cache=lyrcache,
+ elif updateLayerCache(cache=lyrcache,
ds=dso, lyr=lyr,
force=force,
lyrTransaction=lyrTransaction,
- last_updated=now,
- fingerprint=fingerprint):
+ last_updated=now):
rv = ImportStatus.IMPORT_SUCCESS
else:
rv = ImportStatus.IMPORT_NOCHANGE
@@ -553,13 +546,10 @@ def _importSource(lyr : ogr.Layer,
unar : dict[str,Any]|None = None,
args : dict[str,Any] = {},
cachedir : Path|None = None,
- extent : ogr.Geometry|None = None,
- fingerprint = None, fingerprint_map = None) -> None:
+ extent : ogr.Geometry|None = None) -> None:
"""Import a source layer"""
if unar is None:
- return _importSource2(lyr, path, args=args,
- basedir=cachedir, extent=extent,
- fingerprint=fingerprint, fingerprint_map=fingerprint_map)
+ return _importSource2(lyr, path, args=args, basedir=cachedir, extent=extent)
ds_srcpath = Path(args['path'])
if ds_srcpath.is_absolute():
@@ -575,9 +565,7 @@ def _importSource(lyr : ogr.Layer,
fmt=unar.get('format', None),
patterns=unar.get('patterns', None),
exact_matches=[ds_srcpath])
- return _importSource2(lyr, ds_srcpath, args=args,
- basedir=Path(tmpdir), extent=extent,
- fingerprint=fingerprint, fingerprint_map=fingerprint_map)
+ return _importSource2(lyr, ds_srcpath, args=args, basedir=Path(tmpdir), extent=extent)
def setFieldMapValue(fld : ogr.FieldDefn,
idx : int,
@@ -609,8 +597,7 @@ def setFieldMapValue(fld : ogr.FieldDefn,
# pylint: disable-next=too-many-branches, too-many-locals, too-many-statements
def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
- basedir : Path|None, extent : ogr.Geometry|None,
- fingerprint, fingerprint_map) -> None:
+ basedir : Path|None, extent : ogr.Geometry|None) -> None:
"""Import a source layer (already extracted)
This is more or less like ogr2ogr/GDALVectorTranslate() but we roll
out our own (slower) version because GDALVectorTranslate() insists in
@@ -851,8 +838,6 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
f'to {ogr.GeometryTypeToName(eGType_dst)} not implemented')
feature2.SetGeometryDirectly(geom)
- fingerprintFeature(fingerprint, feature2, fingerprint_map)
-
if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE:
raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}')
@@ -879,14 +864,94 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
logging.info('Forced conversion to %s: %s',
ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches))
-def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer,
- layername : str, cache : ogr.Layer,
- last_updated : datetime, fingerprint : bytes,
+def listFieldsOrderBy(defn : ogr.FeatureDefn,
+ unique : bool|None = None,
+ nullable : bool|None = None) -> Iterator[str]:
+ """Return an iterator of column names suitable for ORDER BY."""
+ fields_str = {}
+ for i in range(defn.GetFieldCount()):
+ fld = defn.GetFieldDefn(i)
+ if (fld.IsIgnored() or
+ # if 'unique' or 'unable' is not None then skip the field
+ # unless the boolean matches .IsUnique() resp. .IsNullable()
+ not (unique is None or fld.IsUnique() == unique) or
+ not (nullable is None or fld.IsNullable() == nullable)):
+ continue
+ if fld.GetType() in (ogr.OFTInteger, ogr.OFTInteger64):
+ # list integers first
+ yield fld.GetName()
+ elif fld.GetType() == ogr.OFTString:
+ w = fld.GetWidth()
+ if 0 < w < 256:
+ # only consider short-ish strings
+ fields_str[fld.GetName()] = w
+ # order string columns by width
+ for c,_ in sorted(fields_str.items(), key=lambda x:x[1]):
+ yield c
+
+# pylint: disable-next=too-many-branches, too-many-statements
+def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, cache : ogr.Layer,
+ last_updated : datetime,
lyrTransaction : str|bool|None = None,
force : bool = False) -> bool:
"""Update attributes in the layer cache for the given layer name.
Return a boolean indicating whether changes to layername were *not*
rolled back (hence might still be outstanding in the transaction)."""
+ layername = lyr.GetName()
+
+ dgst = sha256()
+ defn = lyr.GetLayerDefn()
+ fields = []
+ for i in range(defn.GetFieldCount()):
+ fields.append('t.' + escape_identifier(defn.GetFieldDefn(i).GetName()))
+ if len(fields) == 0:
+ fields = ['0 AS hash_properties']
+ else:
+ fields = [ 'hash_record_extended(ROW(' + ','.join(fields) + '),0) AS hash_properties' ]
+
+ fidColumn = lyr.GetFIDColumn()
+ if fidColumn is None or fidColumn == '':
+ raise RuntimeError(f'Couldn\'t find FID column for "{layername}"')
+ # defn.GetGeomFieldCount() != 1 is not supported and yields a warning in _importSource2()
+ geometryColumn = lyr.GetGeometryColumn()
+ if geometryColumn is None or geometryColumn == '':
+ raise RuntimeError(f'Couldn\'t find geometry column for "{layername}"')
+
+ fields.append('sha256(COALESCE(' +
+ 'ST_AsEWKB(t.' + escape_identifier(geometryColumn) + '),' +
+ '\'\')) AS hash_geom')
+ if len(fields) == 0:
+ raise RuntimeError('Empty field list in SELECT')
+ query = 'SELECT ' + ','.join(fields) + ' FROM ' + escape_identifier(layername) + ' t'
+
+ sort_by = next(listFieldsOrderBy(defn, unique=True, nullable=False), None)
+ if sort_by is not None:
+ sort_by = [ sort_by ]
+ else:
+ count = lyr.GetFeatureCount(force=0)
+ if count is None or count < 0 or count > 5000:
+ logging.warning('Layer "%s" has many (%s) features but no UNIQUE NOT NULL constraint, '
+ 'sorting might be unstable and slow', layername,
+ str(count) if (count is not None and count >= 0) else 'N/A')
+ sort_by = list(listFieldsOrderBy(defn)) + [ geometryColumn, fidColumn ]
+ query += ' ORDER BY ' + ','.join(['t.' + escape_identifier(c) for c in sort_by])
+
+ struct_dgst : Final = struct.Struct('@qq').pack
+ logging.debug('%s', query)
+ lyr2 = ds.ExecuteSQL(query)
+ try:
+ assert lyr2.GetLayerDefn().GetFieldDefn(0).GetName() == 'hash_properties'
+ assert lyr2.GetLayerDefn().GetFieldDefn(1).GetName() == 'hash_geom'
+ feature = lyr2.GetNextFeature()
+ while feature is not None:
+ dgst.update(struct_dgst(feature.GetFID(), feature.GetFieldAsInteger64(0)))
+ dgst.update(feature.GetFieldAsBinary(1))
+ feature = lyr2.GetNextFeature()
+ finally:
+ ds.ReleaseResultSet(lyr2)
+ lyr2 = None
+ fingerprint = dgst.digest()
+
attributeFilter = 'layername = ' + escape_literal_str(layername)
logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
cache.SetAttributeFilter(attributeFilter)
@@ -933,8 +998,13 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer,
ret = True
if update:
- if not (force or fingerprint is None or fingerprint_old is None or
- fingerprint != fingerprint_old):
+ if (fingerprint is None or fingerprint_old is None or fingerprint != fingerprint_old):
+ logging.info('Updated layer "%s" has new fingerprint %s', layername,
+ fingerprint.hex()[:8] if fingerprint is not None else 'N/A')
+ elif force:
+ logging.info('Updated layer "%s" has identical fingerprint %s',
+ layername, fingerprint.hex()[:8])
+ else:
# no change: rollback (*before* updating the cache) if possible to retain FID values
if isinstance(lyrTransaction, str):
logging.info('Updated layer "%s" has identical fingerprint %s, rolling back',
@@ -959,9 +1029,9 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer,
except Exception: # pylint: disable=broad-exception-caught
logging.exception('Could not rollback transaction on layer "%s"',
layername)
- else:
- logging.info('Updated layer "%s" has new fingerprint %s',
- layername, fingerprint.hex()[:8])
+ else:
+ logging.info('Updated layer "%s" has identical fingerprint %s',
+ layername, fingerprint.hex()[:8])
# TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead
if cache.SetFeature(feature) != ogr.OGRERR_NONE:
@@ -974,110 +1044,3 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer,
# rollback if needed
ds.FlushCache()
return ret
-
-SERIALIZE_BOOL : Final = struct.Struct('@?').pack
-SERIALIZE_SHORT : Final = struct.Struct('@h').pack
-SERIALIZE_USHORT : Final = struct.Struct('@H').pack
-SERIALIZE_LONG : Final = struct.Struct('@l').pack
-SERIALIZE_ULONG : Final = struct.Struct('@L').pack
-SERIALIZE_LONGLONG : Final = struct.Struct('@q').pack
-SERIALIZE_FLOAT : Final = struct.Struct('@f').pack
-SERIALIZE_DOUBLE : Final = struct.Struct('@d').pack
-SERIALIZE_DATE : Final = struct.Struct('@HBB').pack
-SERIALIZE_DATETIME : Final = struct.Struct('@HBBBBfB').pack
-SERIALIZE_TIME : Final = struct.Struct('@BBfB').pack
-
-def _fingerprintBinary(m, v : bytes) -> None:
- m.update(SERIALIZE_USHORT(len(v)))
- m.update(v)
-def _fingerprintDate(m, f : ogr.Feature, i : int) -> None:
- year, month, day, *_ = f.GetFieldAsDateTime(i)
- m.update(SERIALIZE_DATE(year, month, day))
-def _fingerprintDateTime(m, f : ogr.Feature, i : int) -> None:
- year, month, day, hour, minute, second, tz = f.GetFieldAsDateTime(i)
- m.update(SERIALIZE_DATETIME(year, month, day, hour, minute, second, tz))
-def _fingerprintTime(m, f : ogr.Feature, i : int) -> None:
- _, _, _, hour, minute, second, tz = f.GetFieldAsDateTime(i)
- m.update(SERIALIZE_TIME(hour, minute, second, tz))
-def _fingerprintUUID(m, f : ogr.Feature, i : int) -> None:
- v = bytes.fromhex(f.GetFieldAsString(i).replace('-', ''))
- #assert len(v) == 16
- m.update(v)
-
-def fingerprintLayerDefn(m, lyr : ogr.Layer):
- """Fingerprint a layer definition (schema) and return the
- fingerprinting recipe mapping for its features."""
- defn = lyr.GetLayerDefn()
-
- n = defn.GetFieldCount()
- if n > 65535:
- raise RuntimeError(f'Too many fields {n}')
- m.update(SERIALIZE_USHORT(n))
-
- ret = [None] * n
- for i in range(n):
- defn2 = defn.GetFieldDefn(i)
- _fingerprintBinary(m, defn2.GetName().encode('utf-8'))
-
- fldType = defn2.GetType()
- m.update(SERIALIZE_USHORT(fldType)) # OFTMaxType = 13
-
- fldSubType = defn2.GetSubType()
- m.update(SERIALIZE_USHORT(fldSubType)) # OFSTMaxSubType = 5
-
- m.update(SERIALIZE_LONG(defn2.GetWidth()))
-
- if fldType == ogr.OFTInteger and fldSubType == ogr.OFSTNone:
- ret[i] = lambda m,f,j: m.update(SERIALIZE_LONG(f.GetFieldAsInteger(j)))
- elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTBoolean:
- ret[i] = lambda m,f,j: m.update(SERIALIZE_BOOL(f.GetFieldAsInteger(j)))
- elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTInt16:
- ret[i] = lambda m,f,j: m.update(SERIALIZE_SHORT(f.GetFieldAsInteger(j)))
- elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTNone:
- ret[i] = lambda m,f,j: m.update(SERIALIZE_DOUBLE(f.GetFieldAsDouble(j)))
- elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTFloat32:
- ret[i] = lambda m,f,j: m.update(SERIALIZE_FLOAT(f.GetFieldAsDouble(j)))
- elif fldType == ogr.OFTString and fldSubType == ogr.OFSTNone:
- ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j))
- elif fldType == ogr.OFTString and fldSubType == ogr.OFSTUUID:
- ret[i] = _fingerprintUUID
- elif fldType == ogr.OFTBinary and fldSubType == ogr.OFSTNone:
- ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j))
- elif fldType == ogr.OFTDate and fldSubType == ogr.OFSTNone:
- ret[i] = _fingerprintDate
- elif fldType == ogr.OFTTime and fldSubType == ogr.OFSTNone:
- ret[i] = _fingerprintTime
- elif fldType == ogr.OFTDateTime and fldSubType == ogr.OFSTNone:
- ret[i] = _fingerprintDateTime
- elif fldType == ogr.OFTInteger64 and fldSubType == ogr.OFSTNone:
- ret[i] = lambda m,f,j: m.update(SERIALIZE_LONGLONG(f.GetFieldAsInteger64(j)))
-
- if ret[i] is None:
- raise RuntimeError('Serialization not implemented for type '
- + ogr.GetFieldTypeName(fldType) + ', subtype '
- + ogr.GetFieldSubTypeName(fldSubType))
-
- # defn.GetGeomFieldCount() != 1 is not supported and yields a warning in _importSource2()
- m.update(SERIALIZE_LONG(lyr.GetGeomType()))
- return tuple(ret)
-
-def fingerprintFeature(m, feature : ogr.Feature, x) -> None: # XXX type hint
- """Fingerprint a feature using the given fingerprinting recipe."""
- # TODO bitmap for nulls
- for i, f in enumerate(x):
- if not feature.IsFieldSet(i):
- #raise RuntimeError(f'Field #{i}
- # "{feature.GetDefnRef().GetFieldDefn(i).GetName()}"'
- # 'is not set.')
- m.update(SERIALIZE_BOOL(False))
- elif feature.IsFieldNull(i):
- # we could use a bitmap to serialize in ⸢n/8⸣ bytes instead of n bytes, but there
- # are no builtin functions for that and since we're hashing anyway it's not worth
- # the trouble
- m.update(SERIALIZE_BOOL(False))
- else:
- m.update(SERIALIZE_BOOL(True))
- f(m, feature, i)
- wkb = feature.GetGeometryRef().ExportToIsoWkb()
- m.update(SERIALIZE_ULONG(len(wkb)))
- m.update(wkb)
diff --git a/webmap-import b/webmap-import
index 6f514a9..f20fdef 100755
--- a/webmap-import
+++ b/webmap-import
@@ -377,6 +377,10 @@ def validateLayerCacheField(defn : ogr.FeatureDefn, idx : int,
def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool:
"""Validate layer cache table."""
+ drvName = ds.GetDriver().ShortName
+ if drvName != 'PostgreSQL': # we need hash_record_extended(), sha256() and ST_AsEWKB()
+ logging.warning('Unsupported cache layer for output driver %s', drvName)
+ return False
lyr = ds.GetLayerByName(name)
if lyr is None:
logging.warning('Table "%s" does not exist', name)