aboutsummaryrefslogtreecommitdiffstats
path: root/import_source.py
diff options
context:
space:
mode:
Diffstat (limited to 'import_source.py')
-rw-r--r--import_source.py205
1 files changed, 188 insertions, 17 deletions
diff --git a/import_source.py b/import_source.py
index 2d2f116..c4cb96f 100644
--- a/import_source.py
+++ b/import_source.py
@@ -26,8 +26,10 @@ import re
from fnmatch import fnmatchcase
from pathlib import Path
from datetime import datetime, timedelta, UTC
-from typing import Any, Optional
+from typing import Any, Final, Optional
import traceback
+from hashlib import sha256
+import struct
from osgeo import gdal, ogr, osr
from osgeo.gdalconst import (
@@ -439,7 +441,8 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
cachedir : Path|None = None,
extent : ogr.Geometry|None = None,
dsoTransaction : bool = False,
- lyrcache : ogr.Layer|None = None) -> bool:
+ lyrcache : ogr.Layer|None = None,
+ force : bool = False) -> bool:
"""Clear lyr and import source layers to it."""
layername = lyr.GetName()
@@ -461,6 +464,8 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
rv = True
now = datetime.now().astimezone()
+ fingerprint = sha256()
+ fingerprint_map = fingerprintLayerDefn(fingerprint, lyr)
try:
clearLayer(dso, lyr) # TODO conditional (only if not new)?
@@ -468,17 +473,26 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
_importSource(lyr, **source['source'],
args=source['import'],
cachedir=cachedir,
- extent=extent)
+ extent=extent,
+ fingerprint=fingerprint,
+ fingerprint_map=fingerprint_map)
+
# force the PG driver to call EndCopy() to detect errors and trigger a
# rollback if needed
dso.FlushCache()
+ fingerprint = fingerprint.digest()
if lyrcache is not None:
- updateLayerCache(layername=layername,
- cache=lyrcache,
- ds=dso,
- savepoint=lyrTransaction if isinstance(lyrTransaction,str) else None,
- last_updated=now)
+ if not updateLayerCache(layername=layername,
+ cache=lyrcache,
+ ds=dso, lyr=lyr,
+ force=force,
+ lyrTransaction=lyrTransaction,
+ last_updated=now,
+ fingerprint=fingerprint):
+ if isinstance(lyrTransaction, bool):
+ # the transaction on lyr was already rolled back
+ lyrTransaction = False
except Exception: # pylint: disable=broad-exception-caught
rv = False
@@ -528,11 +542,13 @@ def _importSource(lyr : ogr.Layer,
unar : dict[str,Any]|None = None,
args : dict[str,Any] = {},
cachedir : Path|None = None,
- extent : ogr.Geometry|None = None) -> None:
+ extent : ogr.Geometry|None = None,
+ fingerprint = None, fingerprint_map = None) -> None:
"""Import a source layer"""
if unar is None:
return _importSource2(lyr, path, args=args,
- basedir=cachedir, extent=extent)
+ basedir=cachedir, extent=extent,
+ fingerprint=fingerprint, fingerprint_map=fingerprint_map)
ds_srcpath = Path(args['path'])
if ds_srcpath.is_absolute():
@@ -549,7 +565,8 @@ def _importSource(lyr : ogr.Layer,
patterns=unar.get('patterns', None),
exact_matches=[ds_srcpath])
return _importSource2(lyr, ds_srcpath, args=args,
- basedir=Path(tmpdir), extent=extent)
+ basedir=Path(tmpdir), extent=extent,
+ fingerprint=fingerprint, fingerprint_map=fingerprint_map)
def setFieldMapValue(fld : ogr.FieldDefn,
idx : int,
@@ -581,7 +598,8 @@ def setFieldMapValue(fld : ogr.FieldDefn,
# pylint: disable-next=too-many-branches, too-many-locals, too-many-statements
def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
- basedir : Path|None, extent : ogr.Geometry|None) -> None:
+ basedir : Path|None, extent : ogr.Geometry|None,
+ fingerprint, fingerprint_map) -> None:
"""Import a source layer (already extracted)
This is more or less like ogr2ogr/GDALVectorTranslate() but we roll
out our own (slower) version because GDALVectorTranslate() insists in
@@ -638,7 +656,8 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
defn = lyr.GetLayerDefn()
geomFieldCount = defn.GetGeomFieldCount()
- if geomFieldCount != 1: # TODO Add support for multiple geometry fields
+ if geomFieldCount != 1:
+ # TODO Add support for multiple geometry fields (also in the fingerprinting logic below)
logging.warning('Source layer "%s" has %d != 1 geometry fields', layername, geomFieldCount)
fieldCount = defn.GetFieldCount()
@@ -821,6 +840,8 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
f'to {ogr.GeometryTypeToName(eGType_dst)} not implemented')
feature2.SetGeometryDirectly(geom)
+ fingerprintFeature(fingerprint, feature2, fingerprint_map)
+
if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE:
raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}')
@@ -847,10 +868,14 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
logging.info('Forced conversion to %s: %s',
ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches))
-def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer,
- last_updated : datetime,
- savepoint : str|None = None) -> None:
- """Update attributes in the layer cache for the given layer name."""
+def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer,
+ layername : str, cache : ogr.Layer,
+ last_updated : datetime, fingerprint : bytes,
+ lyrTransaction : str|bool|None = None,
+ force : bool = False) -> bool:
+ """Update attributes in the layer cache for the given layer name.
+ Return a boolean indicating whether changes to layername were *not*
+ rolled back (hence might still be outstanding in the transaction)."""
attributeFilter = 'layername = ' + escape_literal_str(layername)
logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
cache.SetAttributeFilter(attributeFilter)
@@ -862,9 +887,11 @@ def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer,
update = False
feature = ogr.Feature(cache.GetLayerDefn())
feature.SetFieldString(0, layername)
+ fingerprint_old = None
else:
logging.debug('Updating existing feature in layer cache for %s', attributeFilter)
update = True
+ fingerprint_old = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None
assert cache.GetNextFeature() is None
if not gdalVersionMin(maj=3, min=8):
@@ -887,8 +914,44 @@ def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer,
last_updated.minute,
float(last_updated.second) + float(last_updated.microsecond)/1000000.,
tzFlag)
+ if fingerprint is None:
+ feature.SetFieldNull(2)
+ else:
+ # https://lists.osgeo.org/pipermail/gdal-dev/2020-December/053170.html
+ feature.SetFieldBinaryFromHexString(2, fingerprint.hex())
+ ret = True
if update:
+ if not (force or fingerprint is None or fingerprint_old is None or
+ fingerprint != fingerprint_old):
+ # no change: rollback (*before* updating the cache) if possible to retain FID values
+ if isinstance(lyrTransaction, str):
+ logging.info('Updated layer "%s" has identical fingerprint %s, rolling back',
+ layername, fingerprint.hex()[:8])
+ query = 'ROLLBACK TO ' + lyrTransaction
+ logging.debug('%s', query)
+ try:
+ ds.ExecuteSQL(query)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not execute SQL: %s', query)
+ else:
+ ret = False
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
+ logging.info('Updated layer "%s" has identical fingerprint %s, rolling back',
+ layername, fingerprint.hex()[:8])
+ try:
+ if lyr.RollbackTransaction() == ogr.OGRERR_NONE:
+ ret = False
+ else:
+ logging.error('Could not rollback transaction on layer "%s"',
+ layername)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not rollback transaction on layer "%s"',
+ layername)
+ else:
+ logging.info('Updated layer "%s" has new fingerprint %s',
+ layername, fingerprint.hex()[:8])
+
# TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead
if cache.SetFeature(feature) != ogr.OGRERR_NONE:
raise RuntimeError('Could not update feature in layer cache')
@@ -899,3 +962,111 @@ def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer,
# force the PG driver to call EndCopy() to detect errors and trigger a
# rollback if needed
ds.FlushCache()
+ return ret
+
+SERIALIZE_BOOL : Final = struct.Struct('@?').pack
+SERIALIZE_SHORT : Final = struct.Struct('@h').pack
+SERIALIZE_USHORT : Final = struct.Struct('@H').pack
+SERIALIZE_LONG : Final = struct.Struct('@l').pack
+SERIALIZE_ULONG : Final = struct.Struct('@L').pack
+SERIALIZE_LONGLONG : Final = struct.Struct('@q').pack
+SERIALIZE_FLOAT : Final = struct.Struct('@f').pack
+SERIALIZE_DOUBLE : Final = struct.Struct('@d').pack
+SERIALIZE_DATE : Final = struct.Struct('@HBB').pack
+SERIALIZE_DATETIME : Final = struct.Struct('@HBBBBfB').pack
+SERIALIZE_TIME : Final = struct.Struct('@BBfB').pack
+
+def _fingerprintBinary(m, v : bytes) -> None:
+ m.update(SERIALIZE_USHORT(len(v)))
+ m.update(v)
+def _fingerprintDate(m, f : ogr.Feature, i : int) -> None:
+ year, month, day, *_ = f.GetFieldAsDateTime(i)
+ m.update(SERIALIZE_DATE(year, month, day))
+def _fingerprintDateTime(m, f : ogr.Feature, i : int) -> None:
+ year, month, day, hour, minute, second, tz = f.GetFieldAsDateTime(i)
+ m.update(SERIALIZE_DATETIME(year, month, day, hour, minute, second, tz))
+def _fingerprintTime(m, f : ogr.Feature, i : int) -> None:
+ _, _, _, hour, minute, second, tz = f.GetFieldAsDateTime(i)
+ m.update(SERIALIZE_TIME(hour, minute, second, tz))
+def _fingerprintUUID(m, f : ogr.Feature, i : int) -> None:
+ v = bytes.fromhex(f.GetFieldAsString(i).replace('-', ''))
+ #assert len(v) == 16
+ m.update(v)
+
+def fingerprintLayerDefn(m, lyr : ogr.Layer):
+ """Fingerprint a layer definition (schema) and return the
+ fingerprinting recipe mapping for its features."""
+ defn = lyr.GetLayerDefn()
+
+ n = defn.GetFieldCount()
+ if n > 65535:
+ raise RuntimeError(f'Too many fields {n}')
+ m.update(SERIALIZE_USHORT(n))
+
+ ret = [None] * n
+ for i in range(n):
+ defn2 = defn.GetFieldDefn(i)
+ _fingerprintBinary(m, defn2.GetName().encode('utf-8'))
+
+ fldType = defn2.GetType()
+ m.update(SERIALIZE_USHORT(fldType)) # OFTMaxType = 13
+
+ fldSubType = defn2.GetSubType()
+ m.update(SERIALIZE_USHORT(fldSubType)) # OFSTMaxSubType = 5
+
+ m.update(SERIALIZE_LONG(defn2.GetWidth()))
+
+ if fldType == ogr.OFTInteger and fldSubType == ogr.OFSTNone:
+ ret[i] = lambda m,f,j: m.update(SERIALIZE_LONG(f.GetFieldAsInteger(j)))
+ elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTBoolean:
+ ret[i] = lambda m,f,j: m.update(SERIALIZE_BOOL(f.GetFieldAsInteger(j)))
+ elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTInt16:
+ ret[i] = lambda m,f,j: m.update(SERIALIZE_SHORT(f.GetFieldAsInteger(j)))
+ elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTNone:
+ ret[i] = lambda m,f,j: m.update(SERIALIZE_DOUBLE(f.GetFieldAsDouble(j)))
+ elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTFloat32:
+ ret[i] = lambda m,f,j: m.update(SERIALIZE_FLOAT(f.GetFieldAsDouble(j)))
+ elif fldType == ogr.OFTString and fldSubType == ogr.OFSTNone:
+ ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j))
+ elif fldType == ogr.OFTString and fldSubType == ogr.OFSTUUID:
+ ret[i] = _fingerprintUUID
+ elif fldType == ogr.OFTBinary and fldSubType == ogr.OFSTNone:
+ ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j))
+ elif fldType == ogr.OFTDate and fldSubType == ogr.OFSTNone:
+ ret[i] = _fingerprintDate
+ elif fldType == ogr.OFTTime and fldSubType == ogr.OFSTNone:
+ ret[i] = _fingerprintTime
+ elif fldType == ogr.OFTDateTime and fldSubType == ogr.OFSTNone:
+ ret[i] = _fingerprintDateTime
+ elif fldType == ogr.OFTInteger64 and fldSubType == ogr.OFSTNone:
+ ret[i] = lambda m,f,j: m.update(SERIALIZE_LONGLONG(f.GetFieldAsInteger64(j)))
+
+ if ret[i] is None:
+ raise RuntimeError('Serialization not implemented for type '
+ + ogr.GetFieldTypeName(fldType) + ', subtype '
+ + ogr.GetFieldSubTypeName(fldSubType))
+
+ # defn.GetGeomFieldCount() != 1 is not supported and yields a warning in _importSource2()
+ m.update(SERIALIZE_LONG(lyr.GetGeomType()))
+ return tuple(ret)
+
+def fingerprintFeature(m, feature : ogr.Feature, x) -> None: # XXX type hint
+ """Fingerprint a feature using the given fingerprinting recipe."""
+ # TODO bitmap for nulls
+ for i, f in enumerate(x):
+ if not feature.IsFieldSet(i):
+ #raise RuntimeError(f'Field #{i}
+ # "{feature.GetDefnRef().GetFieldDefn(i).GetName()}"'
+ # 'is not set.')
+ m.update(SERIALIZE_BOOL(False))
+ elif feature.IsFieldNull(i):
+ # we could use a bitmap to serialize in ⸢n/8⸣ bytes instead of n bytes, but there
+ # are no builtin functions for that and since we're hashing anyway it's not worth
+ # the trouble
+ m.update(SERIALIZE_BOOL(False))
+ else:
+ m.update(SERIALIZE_BOOL(True))
+ f(m, feature, i)
+ wkb = feature.GetGeometryRef().ExportToIsoWkb()
+ m.update(SERIALIZE_ULONG(len(wkb)))
+ m.update(wkb)