diff options
Diffstat (limited to 'import_source.py')
-rw-r--r-- | import_source.py | 243 |
1 files changed, 103 insertions, 140 deletions
diff --git a/import_source.py b/import_source.py index 647c79e..6fe4acf 100644 --- a/import_source.py +++ b/import_source.py @@ -26,9 +26,9 @@ import re from fnmatch import fnmatchcase from pathlib import Path from datetime import datetime, timedelta, UTC -from typing import Any, Final, Optional +from typing import Any, Final, Iterator, Optional import traceback -from enum import Enum, unique +from enum import Enum, unique as enum_unique from hashlib import sha256 import struct @@ -436,7 +436,7 @@ def listArchiveMembers(namelist : list[str], logging.debug('Ignoring archive member %s', name) return members -@unique +@enum_unique class ImportStatus(Enum): """Return value for importSources(): success, error, or no-change.""" IMPORT_SUCCESS = 0 @@ -472,8 +472,6 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, rv = ImportStatus.IMPORT_NOCHANGE now = datetime.now().astimezone() - fingerprint = sha256() - fingerprint_map = fingerprintLayerDefn(fingerprint, lyr) try: clearLayer(dso, lyr) # TODO conditional (only if not new)? @@ -481,24 +479,19 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer, _importSource(lyr, **source['source'], args=source['import'], cachedir=cachedir, - extent=extent, - fingerprint=fingerprint, - fingerprint_map=fingerprint_map) + extent=extent) # force the PG driver to call EndCopy() to detect errors and trigger a # rollback if needed dso.FlushCache() - fingerprint = fingerprint.digest() if lyrcache is None: rv = ImportStatus.IMPORT_SUCCESS - elif updateLayerCache(layername=layername, - cache=lyrcache, + elif updateLayerCache(cache=lyrcache, ds=dso, lyr=lyr, force=force, lyrTransaction=lyrTransaction, - last_updated=now, - fingerprint=fingerprint): + last_updated=now): rv = ImportStatus.IMPORT_SUCCESS else: rv = ImportStatus.IMPORT_NOCHANGE @@ -553,13 +546,10 @@ def _importSource(lyr : ogr.Layer, unar : dict[str,Any]|None = None, args : dict[str,Any] = {}, cachedir : Path|None = None, - extent : ogr.Geometry|None = None, - fingerprint = None, fingerprint_map = None) -> None: + extent : ogr.Geometry|None = None) -> None: """Import a source layer""" if unar is None: - return _importSource2(lyr, path, args=args, - basedir=cachedir, extent=extent, - fingerprint=fingerprint, fingerprint_map=fingerprint_map) + return _importSource2(lyr, path, args=args, basedir=cachedir, extent=extent) ds_srcpath = Path(args['path']) if ds_srcpath.is_absolute(): @@ -575,9 +565,7 @@ def _importSource(lyr : ogr.Layer, fmt=unar.get('format', None), patterns=unar.get('patterns', None), exact_matches=[ds_srcpath]) - return _importSource2(lyr, ds_srcpath, args=args, - basedir=Path(tmpdir), extent=extent, - fingerprint=fingerprint, fingerprint_map=fingerprint_map) + return _importSource2(lyr, ds_srcpath, args=args, basedir=Path(tmpdir), extent=extent) def setFieldMapValue(fld : ogr.FieldDefn, idx : int, @@ -609,8 +597,7 @@ def setFieldMapValue(fld : ogr.FieldDefn, # pylint: disable-next=too-many-branches, too-many-locals, too-many-statements def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], - basedir : Path|None, extent : ogr.Geometry|None, - fingerprint, fingerprint_map) -> None: + basedir : Path|None, extent : ogr.Geometry|None) -> None: """Import a source layer (already extracted) This is more or less like ogr2ogr/GDALVectorTranslate() but we roll out our own (slower) version because GDALVectorTranslate() insists in @@ -851,8 +838,6 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], f'to {ogr.GeometryTypeToName(eGType_dst)} not implemented') feature2.SetGeometryDirectly(geom) - fingerprintFeature(fingerprint, feature2, fingerprint_map) - if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE: raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}') @@ -879,14 +864,94 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], logging.info('Forced conversion to %s: %s', ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches)) -def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, - layername : str, cache : ogr.Layer, - last_updated : datetime, fingerprint : bytes, +def listFieldsOrderBy(defn : ogr.FeatureDefn, + unique : bool|None = None, + nullable : bool|None = None) -> Iterator[str]: + """Return an iterator of column names suitable for ORDER BY.""" + fields_str = {} + for i in range(defn.GetFieldCount()): + fld = defn.GetFieldDefn(i) + if (fld.IsIgnored() or + # if 'unique' or 'unable' is not None then skip the field + # unless the boolean matches .IsUnique() resp. .IsNullable() + not (unique is None or fld.IsUnique() == unique) or + not (nullable is None or fld.IsNullable() == nullable)): + continue + if fld.GetType() in (ogr.OFTInteger, ogr.OFTInteger64): + # list integers first + yield fld.GetName() + elif fld.GetType() == ogr.OFTString: + w = fld.GetWidth() + if 0 < w < 256: + # only consider short-ish strings + fields_str[fld.GetName()] = w + # order string columns by width + for c,_ in sorted(fields_str.items(), key=lambda x:x[1]): + yield c + +# pylint: disable-next=too-many-branches, too-many-statements +def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, cache : ogr.Layer, + last_updated : datetime, lyrTransaction : str|bool|None = None, force : bool = False) -> bool: """Update attributes in the layer cache for the given layer name. Return a boolean indicating whether changes to layername were *not* rolled back (hence might still be outstanding in the transaction).""" + layername = lyr.GetName() + + dgst = sha256() + defn = lyr.GetLayerDefn() + fields = [] + for i in range(defn.GetFieldCount()): + fields.append('t.' + escape_identifier(defn.GetFieldDefn(i).GetName())) + if len(fields) == 0: + fields = ['0 AS hash_properties'] + else: + fields = [ 'hash_record_extended(ROW(' + ','.join(fields) + '),0) AS hash_properties' ] + + fidColumn = lyr.GetFIDColumn() + if fidColumn is None or fidColumn == '': + raise RuntimeError(f'Couldn\'t find FID column for "{layername}"') + # defn.GetGeomFieldCount() != 1 is not supported and yields a warning in _importSource2() + geometryColumn = lyr.GetGeometryColumn() + if geometryColumn is None or geometryColumn == '': + raise RuntimeError(f'Couldn\'t find geometry column for "{layername}"') + + fields.append('sha256(COALESCE(' + + 'ST_AsEWKB(t.' + escape_identifier(geometryColumn) + '),' + + '\'\')) AS hash_geom') + if len(fields) == 0: + raise RuntimeError('Empty field list in SELECT') + query = 'SELECT ' + ','.join(fields) + ' FROM ' + escape_identifier(layername) + ' t' + + sort_by = next(listFieldsOrderBy(defn, unique=True, nullable=False), None) + if sort_by is not None: + sort_by = [ sort_by ] + else: + count = lyr.GetFeatureCount(force=0) + if count is None or count < 0 or count > 5000: + logging.warning('Layer "%s" has many (%s) features but no UNIQUE NOT NULL constraint, ' + 'sorting might be unstable and slow', layername, + str(count) if (count is not None and count >= 0) else 'N/A') + sort_by = list(listFieldsOrderBy(defn)) + [ geometryColumn, fidColumn ] + query += ' ORDER BY ' + ','.join(['t.' + escape_identifier(c) for c in sort_by]) + + struct_dgst : Final = struct.Struct('@qq').pack + logging.debug('%s', query) + lyr2 = ds.ExecuteSQL(query) + try: + assert lyr2.GetLayerDefn().GetFieldDefn(0).GetName() == 'hash_properties' + assert lyr2.GetLayerDefn().GetFieldDefn(1).GetName() == 'hash_geom' + feature = lyr2.GetNextFeature() + while feature is not None: + dgst.update(struct_dgst(feature.GetFID(), feature.GetFieldAsInteger64(0))) + dgst.update(feature.GetFieldAsBinary(1)) + feature = lyr2.GetNextFeature() + finally: + ds.ReleaseResultSet(lyr2) + lyr2 = None + fingerprint = dgst.digest() + attributeFilter = 'layername = ' + escape_literal_str(layername) logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter) cache.SetAttributeFilter(attributeFilter) @@ -933,8 +998,13 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, ret = True if update: - if not (force or fingerprint is None or fingerprint_old is None or - fingerprint != fingerprint_old): + if (fingerprint is None or fingerprint_old is None or fingerprint != fingerprint_old): + logging.info('Updated layer "%s" has new fingerprint %s', layername, + fingerprint.hex()[:8] if fingerprint is not None else 'N/A') + elif force: + logging.info('Updated layer "%s" has identical fingerprint %s', + layername, fingerprint.hex()[:8]) + else: # no change: rollback (*before* updating the cache) if possible to retain FID values if isinstance(lyrTransaction, str): logging.info('Updated layer "%s" has identical fingerprint %s, rolling back', @@ -959,9 +1029,9 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, except Exception: # pylint: disable=broad-exception-caught logging.exception('Could not rollback transaction on layer "%s"', layername) - else: - logging.info('Updated layer "%s" has new fingerprint %s', - layername, fingerprint.hex()[:8]) + else: + logging.info('Updated layer "%s" has identical fingerprint %s', + layername, fingerprint.hex()[:8]) # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead if cache.SetFeature(feature) != ogr.OGRERR_NONE: @@ -974,110 +1044,3 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, # rollback if needed ds.FlushCache() return ret - -SERIALIZE_BOOL : Final = struct.Struct('@?').pack -SERIALIZE_SHORT : Final = struct.Struct('@h').pack -SERIALIZE_USHORT : Final = struct.Struct('@H').pack -SERIALIZE_LONG : Final = struct.Struct('@l').pack -SERIALIZE_ULONG : Final = struct.Struct('@L').pack -SERIALIZE_LONGLONG : Final = struct.Struct('@q').pack -SERIALIZE_FLOAT : Final = struct.Struct('@f').pack -SERIALIZE_DOUBLE : Final = struct.Struct('@d').pack -SERIALIZE_DATE : Final = struct.Struct('@HBB').pack -SERIALIZE_DATETIME : Final = struct.Struct('@HBBBBfB').pack -SERIALIZE_TIME : Final = struct.Struct('@BBfB').pack - -def _fingerprintBinary(m, v : bytes) -> None: - m.update(SERIALIZE_USHORT(len(v))) - m.update(v) -def _fingerprintDate(m, f : ogr.Feature, i : int) -> None: - year, month, day, *_ = f.GetFieldAsDateTime(i) - m.update(SERIALIZE_DATE(year, month, day)) -def _fingerprintDateTime(m, f : ogr.Feature, i : int) -> None: - year, month, day, hour, minute, second, tz = f.GetFieldAsDateTime(i) - m.update(SERIALIZE_DATETIME(year, month, day, hour, minute, second, tz)) -def _fingerprintTime(m, f : ogr.Feature, i : int) -> None: - _, _, _, hour, minute, second, tz = f.GetFieldAsDateTime(i) - m.update(SERIALIZE_TIME(hour, minute, second, tz)) -def _fingerprintUUID(m, f : ogr.Feature, i : int) -> None: - v = bytes.fromhex(f.GetFieldAsString(i).replace('-', '')) - #assert len(v) == 16 - m.update(v) - -def fingerprintLayerDefn(m, lyr : ogr.Layer): - """Fingerprint a layer definition (schema) and return the - fingerprinting recipe mapping for its features.""" - defn = lyr.GetLayerDefn() - - n = defn.GetFieldCount() - if n > 65535: - raise RuntimeError(f'Too many fields {n}') - m.update(SERIALIZE_USHORT(n)) - - ret = [None] * n - for i in range(n): - defn2 = defn.GetFieldDefn(i) - _fingerprintBinary(m, defn2.GetName().encode('utf-8')) - - fldType = defn2.GetType() - m.update(SERIALIZE_USHORT(fldType)) # OFTMaxType = 13 - - fldSubType = defn2.GetSubType() - m.update(SERIALIZE_USHORT(fldSubType)) # OFSTMaxSubType = 5 - - m.update(SERIALIZE_LONG(defn2.GetWidth())) - - if fldType == ogr.OFTInteger and fldSubType == ogr.OFSTNone: - ret[i] = lambda m,f,j: m.update(SERIALIZE_LONG(f.GetFieldAsInteger(j))) - elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTBoolean: - ret[i] = lambda m,f,j: m.update(SERIALIZE_BOOL(f.GetFieldAsInteger(j))) - elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTInt16: - ret[i] = lambda m,f,j: m.update(SERIALIZE_SHORT(f.GetFieldAsInteger(j))) - elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTNone: - ret[i] = lambda m,f,j: m.update(SERIALIZE_DOUBLE(f.GetFieldAsDouble(j))) - elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTFloat32: - ret[i] = lambda m,f,j: m.update(SERIALIZE_FLOAT(f.GetFieldAsDouble(j))) - elif fldType == ogr.OFTString and fldSubType == ogr.OFSTNone: - ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j)) - elif fldType == ogr.OFTString and fldSubType == ogr.OFSTUUID: - ret[i] = _fingerprintUUID - elif fldType == ogr.OFTBinary and fldSubType == ogr.OFSTNone: - ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j)) - elif fldType == ogr.OFTDate and fldSubType == ogr.OFSTNone: - ret[i] = _fingerprintDate - elif fldType == ogr.OFTTime and fldSubType == ogr.OFSTNone: - ret[i] = _fingerprintTime - elif fldType == ogr.OFTDateTime and fldSubType == ogr.OFSTNone: - ret[i] = _fingerprintDateTime - elif fldType == ogr.OFTInteger64 and fldSubType == ogr.OFSTNone: - ret[i] = lambda m,f,j: m.update(SERIALIZE_LONGLONG(f.GetFieldAsInteger64(j))) - - if ret[i] is None: - raise RuntimeError('Serialization not implemented for type ' - + ogr.GetFieldTypeName(fldType) + ', subtype ' - + ogr.GetFieldSubTypeName(fldSubType)) - - # defn.GetGeomFieldCount() != 1 is not supported and yields a warning in _importSource2() - m.update(SERIALIZE_LONG(lyr.GetGeomType())) - return tuple(ret) - -def fingerprintFeature(m, feature : ogr.Feature, x) -> None: # XXX type hint - """Fingerprint a feature using the given fingerprinting recipe.""" - # TODO bitmap for nulls - for i, f in enumerate(x): - if not feature.IsFieldSet(i): - #raise RuntimeError(f'Field #{i} - # "{feature.GetDefnRef().GetFieldDefn(i).GetName()}"' - # 'is not set.') - m.update(SERIALIZE_BOOL(False)) - elif feature.IsFieldNull(i): - # we could use a bitmap to serialize in ⸢n/8⸣ bytes instead of n bytes, but there - # are no builtin functions for that and since we're hashing anyway it's not worth - # the trouble - m.update(SERIALIZE_BOOL(False)) - else: - m.update(SERIALIZE_BOOL(True)) - f(m, feature, i) - wkb = feature.GetGeometryRef().ExportToIsoWkb() - m.update(SERIALIZE_ULONG(len(wkb))) - m.update(wkb) |