aboutsummaryrefslogtreecommitdiffstats
path: root/import_source.py
diff options
context:
space:
mode:
Diffstat (limited to 'import_source.py')
-rw-r--r--import_source.py243
1 files changed, 103 insertions, 140 deletions
diff --git a/import_source.py b/import_source.py
index 647c79e..6fe4acf 100644
--- a/import_source.py
+++ b/import_source.py
@@ -26,9 +26,9 @@ import re
from fnmatch import fnmatchcase
from pathlib import Path
from datetime import datetime, timedelta, UTC
-from typing import Any, Final, Optional
+from typing import Any, Final, Iterator, Optional
import traceback
-from enum import Enum, unique
+from enum import Enum, unique as enum_unique
from hashlib import sha256
import struct
@@ -436,7 +436,7 @@ def listArchiveMembers(namelist : list[str],
logging.debug('Ignoring archive member %s', name)
return members
-@unique
+@enum_unique
class ImportStatus(Enum):
"""Return value for importSources(): success, error, or no-change."""
IMPORT_SUCCESS = 0
@@ -472,8 +472,6 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
rv = ImportStatus.IMPORT_NOCHANGE
now = datetime.now().astimezone()
- fingerprint = sha256()
- fingerprint_map = fingerprintLayerDefn(fingerprint, lyr)
try:
clearLayer(dso, lyr) # TODO conditional (only if not new)?
@@ -481,24 +479,19 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
_importSource(lyr, **source['source'],
args=source['import'],
cachedir=cachedir,
- extent=extent,
- fingerprint=fingerprint,
- fingerprint_map=fingerprint_map)
+ extent=extent)
# force the PG driver to call EndCopy() to detect errors and trigger a
# rollback if needed
dso.FlushCache()
- fingerprint = fingerprint.digest()
if lyrcache is None:
rv = ImportStatus.IMPORT_SUCCESS
- elif updateLayerCache(layername=layername,
- cache=lyrcache,
+ elif updateLayerCache(cache=lyrcache,
ds=dso, lyr=lyr,
force=force,
lyrTransaction=lyrTransaction,
- last_updated=now,
- fingerprint=fingerprint):
+ last_updated=now):
rv = ImportStatus.IMPORT_SUCCESS
else:
rv = ImportStatus.IMPORT_NOCHANGE
@@ -553,13 +546,10 @@ def _importSource(lyr : ogr.Layer,
unar : dict[str,Any]|None = None,
args : dict[str,Any] = {},
cachedir : Path|None = None,
- extent : ogr.Geometry|None = None,
- fingerprint = None, fingerprint_map = None) -> None:
+ extent : ogr.Geometry|None = None) -> None:
"""Import a source layer"""
if unar is None:
- return _importSource2(lyr, path, args=args,
- basedir=cachedir, extent=extent,
- fingerprint=fingerprint, fingerprint_map=fingerprint_map)
+ return _importSource2(lyr, path, args=args, basedir=cachedir, extent=extent)
ds_srcpath = Path(args['path'])
if ds_srcpath.is_absolute():
@@ -575,9 +565,7 @@ def _importSource(lyr : ogr.Layer,
fmt=unar.get('format', None),
patterns=unar.get('patterns', None),
exact_matches=[ds_srcpath])
- return _importSource2(lyr, ds_srcpath, args=args,
- basedir=Path(tmpdir), extent=extent,
- fingerprint=fingerprint, fingerprint_map=fingerprint_map)
+ return _importSource2(lyr, ds_srcpath, args=args, basedir=Path(tmpdir), extent=extent)
def setFieldMapValue(fld : ogr.FieldDefn,
idx : int,
@@ -609,8 +597,7 @@ def setFieldMapValue(fld : ogr.FieldDefn,
# pylint: disable-next=too-many-branches, too-many-locals, too-many-statements
def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
- basedir : Path|None, extent : ogr.Geometry|None,
- fingerprint, fingerprint_map) -> None:
+ basedir : Path|None, extent : ogr.Geometry|None) -> None:
"""Import a source layer (already extracted)
This is more or less like ogr2ogr/GDALVectorTranslate() but we roll
out our own (slower) version because GDALVectorTranslate() insists in
@@ -851,8 +838,6 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
f'to {ogr.GeometryTypeToName(eGType_dst)} not implemented')
feature2.SetGeometryDirectly(geom)
- fingerprintFeature(fingerprint, feature2, fingerprint_map)
-
if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE:
raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}')
@@ -879,14 +864,94 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
logging.info('Forced conversion to %s: %s',
ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches))
-def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer,
- layername : str, cache : ogr.Layer,
- last_updated : datetime, fingerprint : bytes,
+def listFieldsOrderBy(defn : ogr.FeatureDefn,
+ unique : bool|None = None,
+ nullable : bool|None = None) -> Iterator[str]:
+ """Return an iterator of column names suitable for ORDER BY."""
+ fields_str = {}
+ for i in range(defn.GetFieldCount()):
+ fld = defn.GetFieldDefn(i)
+ if (fld.IsIgnored() or
+ # if 'unique' or 'unable' is not None then skip the field
+ # unless the boolean matches .IsUnique() resp. .IsNullable()
+ not (unique is None or fld.IsUnique() == unique) or
+ not (nullable is None or fld.IsNullable() == nullable)):
+ continue
+ if fld.GetType() in (ogr.OFTInteger, ogr.OFTInteger64):
+ # list integers first
+ yield fld.GetName()
+ elif fld.GetType() == ogr.OFTString:
+ w = fld.GetWidth()
+ if 0 < w < 256:
+ # only consider short-ish strings
+ fields_str[fld.GetName()] = w
+ # order string columns by width
+ for c,_ in sorted(fields_str.items(), key=lambda x:x[1]):
+ yield c
+
+# pylint: disable-next=too-many-branches, too-many-statements
+def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, cache : ogr.Layer,
+ last_updated : datetime,
lyrTransaction : str|bool|None = None,
force : bool = False) -> bool:
"""Update attributes in the layer cache for the given layer name.
Return a boolean indicating whether changes to layername were *not*
rolled back (hence might still be outstanding in the transaction)."""
+ layername = lyr.GetName()
+
+ dgst = sha256()
+ defn = lyr.GetLayerDefn()
+ fields = []
+ for i in range(defn.GetFieldCount()):
+ fields.append('t.' + escape_identifier(defn.GetFieldDefn(i).GetName()))
+ if len(fields) == 0:
+ fields = ['0 AS hash_properties']
+ else:
+ fields = [ 'hash_record_extended(ROW(' + ','.join(fields) + '),0) AS hash_properties' ]
+
+ fidColumn = lyr.GetFIDColumn()
+ if fidColumn is None or fidColumn == '':
+ raise RuntimeError(f'Couldn\'t find FID column for "{layername}"')
+ # defn.GetGeomFieldCount() != 1 is not supported and yields a warning in _importSource2()
+ geometryColumn = lyr.GetGeometryColumn()
+ if geometryColumn is None or geometryColumn == '':
+ raise RuntimeError(f'Couldn\'t find geometry column for "{layername}"')
+
+ fields.append('sha256(COALESCE(' +
+ 'ST_AsEWKB(t.' + escape_identifier(geometryColumn) + '),' +
+ '\'\')) AS hash_geom')
+ if len(fields) == 0:
+ raise RuntimeError('Empty field list in SELECT')
+ query = 'SELECT ' + ','.join(fields) + ' FROM ' + escape_identifier(layername) + ' t'
+
+ sort_by = next(listFieldsOrderBy(defn, unique=True, nullable=False), None)
+ if sort_by is not None:
+ sort_by = [ sort_by ]
+ else:
+ count = lyr.GetFeatureCount(force=0)
+ if count is None or count < 0 or count > 5000:
+ logging.warning('Layer "%s" has many (%s) features but no UNIQUE NOT NULL constraint, '
+ 'sorting might be unstable and slow', layername,
+ str(count) if (count is not None and count >= 0) else 'N/A')
+ sort_by = list(listFieldsOrderBy(defn)) + [ geometryColumn, fidColumn ]
+ query += ' ORDER BY ' + ','.join(['t.' + escape_identifier(c) for c in sort_by])
+
+ struct_dgst : Final = struct.Struct('@qq').pack
+ logging.debug('%s', query)
+ lyr2 = ds.ExecuteSQL(query)
+ try:
+ assert lyr2.GetLayerDefn().GetFieldDefn(0).GetName() == 'hash_properties'
+ assert lyr2.GetLayerDefn().GetFieldDefn(1).GetName() == 'hash_geom'
+ feature = lyr2.GetNextFeature()
+ while feature is not None:
+ dgst.update(struct_dgst(feature.GetFID(), feature.GetFieldAsInteger64(0)))
+ dgst.update(feature.GetFieldAsBinary(1))
+ feature = lyr2.GetNextFeature()
+ finally:
+ ds.ReleaseResultSet(lyr2)
+ lyr2 = None
+ fingerprint = dgst.digest()
+
attributeFilter = 'layername = ' + escape_literal_str(layername)
logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
cache.SetAttributeFilter(attributeFilter)
@@ -933,8 +998,13 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer,
ret = True
if update:
- if not (force or fingerprint is None or fingerprint_old is None or
- fingerprint != fingerprint_old):
+ if (fingerprint is None or fingerprint_old is None or fingerprint != fingerprint_old):
+ logging.info('Updated layer "%s" has new fingerprint %s', layername,
+ fingerprint.hex()[:8] if fingerprint is not None else 'N/A')
+ elif force:
+ logging.info('Updated layer "%s" has identical fingerprint %s',
+ layername, fingerprint.hex()[:8])
+ else:
# no change: rollback (*before* updating the cache) if possible to retain FID values
if isinstance(lyrTransaction, str):
logging.info('Updated layer "%s" has identical fingerprint %s, rolling back',
@@ -959,9 +1029,9 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer,
except Exception: # pylint: disable=broad-exception-caught
logging.exception('Could not rollback transaction on layer "%s"',
layername)
- else:
- logging.info('Updated layer "%s" has new fingerprint %s',
- layername, fingerprint.hex()[:8])
+ else:
+ logging.info('Updated layer "%s" has identical fingerprint %s',
+ layername, fingerprint.hex()[:8])
# TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead
if cache.SetFeature(feature) != ogr.OGRERR_NONE:
@@ -974,110 +1044,3 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer,
# rollback if needed
ds.FlushCache()
return ret
-
-SERIALIZE_BOOL : Final = struct.Struct('@?').pack
-SERIALIZE_SHORT : Final = struct.Struct('@h').pack
-SERIALIZE_USHORT : Final = struct.Struct('@H').pack
-SERIALIZE_LONG : Final = struct.Struct('@l').pack
-SERIALIZE_ULONG : Final = struct.Struct('@L').pack
-SERIALIZE_LONGLONG : Final = struct.Struct('@q').pack
-SERIALIZE_FLOAT : Final = struct.Struct('@f').pack
-SERIALIZE_DOUBLE : Final = struct.Struct('@d').pack
-SERIALIZE_DATE : Final = struct.Struct('@HBB').pack
-SERIALIZE_DATETIME : Final = struct.Struct('@HBBBBfB').pack
-SERIALIZE_TIME : Final = struct.Struct('@BBfB').pack
-
-def _fingerprintBinary(m, v : bytes) -> None:
- m.update(SERIALIZE_USHORT(len(v)))
- m.update(v)
-def _fingerprintDate(m, f : ogr.Feature, i : int) -> None:
- year, month, day, *_ = f.GetFieldAsDateTime(i)
- m.update(SERIALIZE_DATE(year, month, day))
-def _fingerprintDateTime(m, f : ogr.Feature, i : int) -> None:
- year, month, day, hour, minute, second, tz = f.GetFieldAsDateTime(i)
- m.update(SERIALIZE_DATETIME(year, month, day, hour, minute, second, tz))
-def _fingerprintTime(m, f : ogr.Feature, i : int) -> None:
- _, _, _, hour, minute, second, tz = f.GetFieldAsDateTime(i)
- m.update(SERIALIZE_TIME(hour, minute, second, tz))
-def _fingerprintUUID(m, f : ogr.Feature, i : int) -> None:
- v = bytes.fromhex(f.GetFieldAsString(i).replace('-', ''))
- #assert len(v) == 16
- m.update(v)
-
-def fingerprintLayerDefn(m, lyr : ogr.Layer):
- """Fingerprint a layer definition (schema) and return the
- fingerprinting recipe mapping for its features."""
- defn = lyr.GetLayerDefn()
-
- n = defn.GetFieldCount()
- if n > 65535:
- raise RuntimeError(f'Too many fields {n}')
- m.update(SERIALIZE_USHORT(n))
-
- ret = [None] * n
- for i in range(n):
- defn2 = defn.GetFieldDefn(i)
- _fingerprintBinary(m, defn2.GetName().encode('utf-8'))
-
- fldType = defn2.GetType()
- m.update(SERIALIZE_USHORT(fldType)) # OFTMaxType = 13
-
- fldSubType = defn2.GetSubType()
- m.update(SERIALIZE_USHORT(fldSubType)) # OFSTMaxSubType = 5
-
- m.update(SERIALIZE_LONG(defn2.GetWidth()))
-
- if fldType == ogr.OFTInteger and fldSubType == ogr.OFSTNone:
- ret[i] = lambda m,f,j: m.update(SERIALIZE_LONG(f.GetFieldAsInteger(j)))
- elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTBoolean:
- ret[i] = lambda m,f,j: m.update(SERIALIZE_BOOL(f.GetFieldAsInteger(j)))
- elif fldType == ogr.OFTInteger and fldSubType == ogr.OFSTInt16:
- ret[i] = lambda m,f,j: m.update(SERIALIZE_SHORT(f.GetFieldAsInteger(j)))
- elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTNone:
- ret[i] = lambda m,f,j: m.update(SERIALIZE_DOUBLE(f.GetFieldAsDouble(j)))
- elif fldType == ogr.OFTReal and fldSubType == ogr.OFSTFloat32:
- ret[i] = lambda m,f,j: m.update(SERIALIZE_FLOAT(f.GetFieldAsDouble(j)))
- elif fldType == ogr.OFTString and fldSubType == ogr.OFSTNone:
- ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j))
- elif fldType == ogr.OFTString and fldSubType == ogr.OFSTUUID:
- ret[i] = _fingerprintUUID
- elif fldType == ogr.OFTBinary and fldSubType == ogr.OFSTNone:
- ret[i] = lambda m,f,j: _fingerprintBinary(m, f.GetFieldAsBinary(j))
- elif fldType == ogr.OFTDate and fldSubType == ogr.OFSTNone:
- ret[i] = _fingerprintDate
- elif fldType == ogr.OFTTime and fldSubType == ogr.OFSTNone:
- ret[i] = _fingerprintTime
- elif fldType == ogr.OFTDateTime and fldSubType == ogr.OFSTNone:
- ret[i] = _fingerprintDateTime
- elif fldType == ogr.OFTInteger64 and fldSubType == ogr.OFSTNone:
- ret[i] = lambda m,f,j: m.update(SERIALIZE_LONGLONG(f.GetFieldAsInteger64(j)))
-
- if ret[i] is None:
- raise RuntimeError('Serialization not implemented for type '
- + ogr.GetFieldTypeName(fldType) + ', subtype '
- + ogr.GetFieldSubTypeName(fldSubType))
-
- # defn.GetGeomFieldCount() != 1 is not supported and yields a warning in _importSource2()
- m.update(SERIALIZE_LONG(lyr.GetGeomType()))
- return tuple(ret)
-
-def fingerprintFeature(m, feature : ogr.Feature, x) -> None: # XXX type hint
- """Fingerprint a feature using the given fingerprinting recipe."""
- # TODO bitmap for nulls
- for i, f in enumerate(x):
- if not feature.IsFieldSet(i):
- #raise RuntimeError(f'Field #{i}
- # "{feature.GetDefnRef().GetFieldDefn(i).GetName()}"'
- # 'is not set.')
- m.update(SERIALIZE_BOOL(False))
- elif feature.IsFieldNull(i):
- # we could use a bitmap to serialize in ⸢n/8⸣ bytes instead of n bytes, but there
- # are no builtin functions for that and since we're hashing anyway it's not worth
- # the trouble
- m.update(SERIALIZE_BOOL(False))
- else:
- m.update(SERIALIZE_BOOL(True))
- f(m, feature, i)
- wkb = feature.GetGeometryRef().ExportToIsoWkb()
- m.update(SERIALIZE_ULONG(len(wkb)))
- m.update(wkb)