diff options
author | Guilhem Moulin <guilhem@fripost.org> | 2025-04-23 17:55:57 +0200 |
---|---|---|
committer | Guilhem Moulin <guilhem@fripost.org> | 2025-04-24 16:49:37 +0200 |
commit | 80882acd6ba407847fed0ef308e440b88796e0e1 (patch) | |
tree | 7084e4508c9ad3aa72c034735e8cbc85738af93c /webmap-import | |
parent | c42245fc566eab01dd2e1f4b07bcbef7432c89c1 (diff) |
Change layer cache logic to target destination layers rather than sources.
In a future commit we'll fingerprint layers to detect changes.
Comparing modification times is not enough since some sources (for
instance Naturvårdsverket's SCI_Rikstackande) are updated on the server
even though no objects are being added; the source layer remains
unchanged but the file differs because of OBJECTID changes we are not
interested in.
Rather than using another cache layer/table for fingerprints, we cache
destination layernames rather than triplets (source_path, archive_member,
layername), along with the time at which the import was started rather
than source_path's mtime.
There is indeed no value in having exact source_path's mtime in the
cache. What we need is simply a way to detect whether source paths have
been updated in a subsequent run. Thanks to the shared locks the ctime
of any updated source path will be at least the time when the locks are
released, thereby exceeding the last_updated value.
Diffstat (limited to 'webmap-import')
-rwxr-xr-x | webmap-import | 280 |
1 files changed, 148 insertions, 132 deletions
diff --git a/webmap-import b/webmap-import index fd10705..0bf9c9e 100755 --- a/webmap-import +++ b/webmap-import @@ -28,7 +28,8 @@ from fcntl import flock, LOCK_EX, LOCK_SH import logging import argparse import re -from datetime import datetime +from datetime import datetime, timedelta, timezone, UTC +from math import modf from pathlib import Path from typing import Any, Optional, NoReturn import traceback @@ -65,7 +66,7 @@ from import_source import ( validateOutputLayer, clearLayer, importSources, - getSourceCacheKey + updateLayerCache ) def setFieldIf(cond : bool, @@ -284,13 +285,14 @@ def validate_sources(layers : dict[str, Any]) -> None: for layername in toremove: layers.pop(layername) -def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int, - name : str, - typ : int, - subtyp : int = ogr.OFSTNone, - width : int = 0, - isNullable : Optional[bool] = None) -> bool: - """Validate field #idx from the source cache layer/table.""" +def validateLayerCacheField(defn : ogr.FeatureDefn, idx : int, + name : str, + typ : int, + subtyp : int = ogr.OFSTNone, + width : int = 0, + unique : Optional[bool] = None, + nullable : Optional[bool] = None) -> bool: + """Validate field #idx from the layer cache table.""" n = defn.GetFieldCount() if idx >= n: return False @@ -299,173 +301,175 @@ def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int, b = True name2 = defn.GetName() if name2 != name: - logging.warning('Source cache layer\'s field #%d has name "%s" != "%s", ignoring cache', - idx, name2, name) + logging.warning('Layer cache\'s field #%d has name "%s" != "%s"', idx, name2, name) b = False - if isNullable is not None and defn.IsNullable() != isNullable: + if nullable is not None and defn.IsNullable() != nullable: # non-fatal - logging.warning('Source cache layer\'s field #%d ("%s") %s nullable', - idx, name2, 'is' if defn.IsNullable() else 'is not') + logging.warning('Layer cache\'s field #%d ("%s") %s nullable', + idx, name2, 'is' if defn.IsNullable() else 'isn\'t') + + if unique is not None and defn.IsUnique() != unique: + # non-fatal + logging.warning('Layer cache\'s field #%d ("%s") %s unique', + idx, name2, 'is' if defn.IsUnique() else 'isn\'t') typ2 = defn.GetType() if typ2 != typ: - logging.warning('Source cache layer\'s field #%d ("%s") has type %s != %s, ignoring cache', - idx, name2, + logging.warning('Layer cache\'s field #%d ("%s") has type %s != %s', idx, name2, ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ)) b = False subtyp2 = defn.GetSubType() if subtyp2 != subtyp: - logging.warning('Source cache layer\'s field #%d ("%s") has subtype %s != %s, ' - 'ignoring cache', idx, name2, + logging.warning('Layer cache\'s field #%d ("%s") has subtype %s != %s', idx, name2, ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp)) b = False width2 = defn.GetWidth() if width2 != 0 and (width == 0 or width2 < width): # non-fatal - logging.warning('Source cache layer\'s field #%d ("%s") is too small (width %d < %d)', + logging.warning('Layer cache\'s field #%d ("%s") is too small (width %d < %d)', idx, name2, width2, width) return b -def getSourceCacheLayer(ds : gdal.Dataset, name : str|None, - force : bool = False) -> ogr.Layer|None: - """Get the source cache layer/table and validate it.""" - if name is None: - return None - +def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool: + """Validate layer cache table.""" lyr = ds.GetLayerByName(name) if lyr is None: - if not force: # show a warning if args.force is not set - logging.warning('Table "%s" does not exist, implying --force', name) - return None + logging.warning('Table "%s" does not exist', name) + return False # if not (lyr.TestCapability(ogr.OLCRandomWrite) and -# gdalVersionMin(maj=3, min=9) and +# gdalVersionMin(maj=3, min=7) and # lyr.TestCapability(ogr.OLCUpdateFeature)): -# # TODO OLCUpdateFeature was added in 3.7 but we also want to use .GetDataset() -# # which was added in 3.9 # logging.warning('Layer "%s" does not support OLCUpdateFeature capability, ' # 'ignoring cache', name) # return None defn = lyr.GetLayerDefn() fields = [ - { 'name': 'source_path', 'typ': ogr.OFTString, - 'isNullable': False, 'width': 2047 }, - { 'name': 'archive_member', 'typ': ogr.OFTString, - 'isNullable': False, 'width': 2047 }, { 'name': 'layername', 'typ': ogr.OFTString, - 'isNullable': False, 'width': 255 }, - { 'name': 'mtime_ns', 'typ': ogr.OFTInteger64, - 'isNullable': True }, + 'nullable': False, 'unique': True, 'width': 255 }, + { 'name': 'last_updated', 'typ': ogr.OFTDateTime, + 'nullable': False }, ] m = len(fields) n = defn.GetFieldCount() if n < m: - logging.warning('Source cache layer/table "%s" has %d < %d fields, ignoring cache', - name, n, m) + # this is fatal, and `all(bs)` is False so we return False below + logging.warning('Layer cache "%s" has %d < %d fields', name, n, m) elif n != m: - logging.warning('Source cache layer/table "%s" has %d != %d fields', name, n, m) - if not all(validateSourceCacheField(defn, i, **fld) for i,fld in enumerate(fields)): - return None + logging.warning('Layer cache "%s" has %d != %d fields', name, n, m) + bs = [ validateLayerCacheField(defn, i, **fld) for i,fld in enumerate(fields) ] + if not all(bs): + return False n = defn.GetGeomFieldCount() if n > 0: geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName()) for i in range(n) ] - logging.warning('Source cache layer/table "%s" has %d > 0 geometry field(s): %s', + logging.warning('Layer cache "%s" has %d > 0 geometry field(s): %s', name, n, ', '.join(geomFieldNames)) if gdalVersionMin(maj=3, min=5): style = lyr.GetStyleTable() if style is not None: - logging.warning('Source cache layer/table "%s" has a style table "%s"', + logging.warning('Layer cache "%s" has a style table "%s"', name, style.GetLastStyleName()) - return lyr - -def getSourcesMtimeNS(sources : dict[str,Any], - cachedir : Optional[Path] = None) -> dict[str,int|None]: - """Return a dictionary mapping each source path to its last modification - time (in nanoseconds), or None if said source path is not a regular file.""" - mtimes_ns = {} + return True + +def areSourceFilesNewer(layername : str, + sources : dict[str,Any], + cache : ogr.Layer, + cachedir : Optional[Path] = None) -> bool: + """Return a boolean indicating whether the layer cache is up to date with + respect to the source files found on disk. That is, the last modification + and last changed time of each source file needs to be equal or lower than + the `last_updated` value found in the layer cache.""" + + source_paths = set() for source in sources: # the same source_path can be used multiple times, stat(2) only once source_path = source['source']['path'] - mtimes_ns[source_path] = None - for source_path in mtimes_ns: + source_paths.add(source_path) + if len(source_paths) == 0: + return False + + t = None + mtimes_ns = {} + for source_path in source_paths: path = source_path if cachedir is None else str(cachedir.joinpath(source_path)) try: st = os.stat(path) if not S_ISREG(st.st_mode): raise FileNotFoundError mtimes_ns[source_path] = st.st_mtime_ns + # take max(mtime, ctime): if we lock source paths any update after + # aquiring the lock will yield a value larger than time.time_ns() + t2 = max(st.st_mtime_ns, st.st_ctime_ns) + if t is None or t < t2: + t = t2 except (OSError, ValueError): #logging.warning('Could not stat(%s)', path) - pass - return mtimes_ns - -def isSourceCacheDirtyTime(lyr : ogr.Layer, sources : dict[str,Any], - mtimes_ns : dict[str,int|None]) -> bool: - """Return a boolean indicating whether the source cache layer/table is up to - date with respect to the dictionary mapping each source path to its last - modification time. That is, every triplet (source_path, archive_member, - layername) needs to be present in cache, and the corresponding mtime_ns - needs to match the stat(2) output of the source file on disk.""" - ks = set() - for source in sources: - source_path, archive_member, layername = k = getSourceCacheKey(source) - if k in ks: - raise BadConfiguration(f'Duplicate key {k}') - ks.add(k) + return True + assert t is not None - if len(ks) == 0: - return False + attributeFilter = 'layername = ' + escape_literal_str(layername) + logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter) + cache.SetAttributeFilter(attributeFilter) - attributeFilter = [] - for source_path, archive_member, layername in ks: - attributeFilter.append('(' + escape_literal_str(source_path) + ',' - + escape_literal_str(archive_member) + ',' - + escape_literal_str(layername) + ')') - if len(attributeFilter) == 1: - attributeFilter = '= ' + attributeFilter[0] + feature = cache.GetNextFeature() + if feature is None: + # not in cache + return True + + if not feature.IsFieldSetAndNotNull(1): + ret = True else: - attributeFilter = 'IN (' + ','.join(attributeFilter) + ')' - attributeFilter = '(source_path,archive_member,layername) ' + attributeFilter - logging.debug('SetAttributeFilter("%s", "%s")', lyr.GetName(), attributeFilter) - lyr.SetAttributeFilter(attributeFilter) - - cache = {} - feature = lyr.GetNextFeature() - while feature is not None: - k = ( - feature.GetFieldAsString(0) if feature.IsFieldSetAndNotNull(0) else None, - feature.GetFieldAsString(1) if feature.IsFieldSetAndNotNull(1) else None, - feature.GetFieldAsString(2) if feature.IsFieldSetAndNotNull(2) else None, + # https://gdal.org/en/stable/api/python/vector_api.html#osgeo.ogr.Feature.GetFieldAsDateTime + # [ year, month, day, hour, minute, second, timezone flag ] + dt = feature.GetFieldAsDateTime(1) + if not gdalVersionMin(maj=3, min=8): + tz = None # assume local time + elif dt[6] == ogr.TZFLAG_UNKNOWN: + logging.warning('Datetime specified with unknown timezone in layer cache\'s ' + 'field #%d "%s", assuming local time', 1, + feature.GetDefnRef().GetFieldDefn(1).GetName()) + tz = None + elif dt[6] == ogr.TZFLAG_LOCALTIME: + tz = None + elif dt[6] == ogr.TZFLAG_UTC: + tz = UTC + else: + tz = timezone(offset=timedelta(seconds=(dt[6] - ogr.TZFLAG_UTC) * 900)) + ms, s = modf(dt[5]) + dt = datetime( + year=dt[0], # including century + month=dt[1], # 01 ≤ year ≤ 12 + day=dt[2], # 01 ≤ day ≤ 31 + hour=dt[3], # 00 ≤ hour ≤ 23 + minute=dt[4], # 00 ≤ minute ≤ 59 + second=int(s), # 00 ≤ second ≤ 59 + microsecond=round(ms*1000000), + tzinfo=tz ) - if any(k0 is None for k0 in k): - # no column in the the key may be NULL - raise RuntimeError(f'Bad key {k}') - if k in cache: - raise RuntimeError(f'Duplicate key {k}') - cache[k] = feature.GetFieldAsInteger64(3) if feature.IsFieldSetAndNotNull(3) else None - feature = lyr.GetNextFeature() - - for k in ks: - a = cache.get(k, None) - b = mtimes_ns[k[0]] - if a is None or b is None or a != b: - return True - - for source_path in sorted({ p for p,_,_ in ks }): - # XXX datetime.fromtimestamp() doesn't support nanosecond input - # https://github.com/python/cpython/issues/59648 - mtime = (mtimes_ns[source_path] // 1000)/1000000. - dt = datetime.fromtimestamp(mtime) - logging.info('Source file %s is unchanged (last modification time %s)', - source_path, dt.astimezone().isoformat(timespec='seconds')) - return False + logging.debug('Found entry in layer cache for "%s", last_updated=%s', layername, + dt.isoformat(timespec='microseconds')) + ret = int(dt.timestamp() * 1000000.) * 1000 < t + + if cache.GetNextFeature() is not None: + raise RuntimeError(f'Duplicate key {layername}') + + if not ret: + for source_path, mtime_ns in sorted(mtimes_ns.items()): + # XXX datetime.fromtimestamp() doesn't support nanosecond input + # https://github.com/python/cpython/issues/59648 + mtime = (mtime_ns // 1000) / 1000000. + dt = datetime.fromtimestamp(mtime) + logging.info('Source file %s is unchanged (last modification time %s)', + source_path, dt.astimezone().isoformat(timespec='seconds')) + return ret def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]: """Place shared locks on each source path and return their respective file @@ -611,6 +615,7 @@ def main() -> NoReturn: lockdir=args.lockdir_sources) if (dso.TestCapability(ogr.ODsCTransactions) and + # we need SAVEPOINT support dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')): logging.debug('Starting transaction') dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE @@ -619,10 +624,13 @@ def main() -> NoReturn: dso.GetDriver().ShortName) dsoTransaction = False - # get source cache layer/table - lyr_sourcecache = getSourceCacheLayer(dso, - config['dataset'].get('sourcecache-layername', None), - force=args.force) + # validate layer cache + lyr_cache = config['dataset'].get('layercache', None) + if lyr_cache is not None and not validateCacheLayer(dso, lyr_cache): + if not args.force: + logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache) + args.force = True + lyr_cache = None bChanged = False rv = 0 @@ -637,14 +645,12 @@ def main() -> NoReturn: 'CreateFeature() method') sources = layerdef['sources'] - if lyr_sourcecache is None: - sourcesMtimeNS = None - else: - sourcesMtimeNS = getSourcesMtimeNS(sources, cachedir=cachedir) - if not (args.force or isSourceCacheDirtyTime(lyr_sourcecache, sources, - mtimes_ns=sourcesMtimeNS)): - logging.info('Output layer "%s" is up to date, skipping', layername) - continue + if not (lyr_cache is None or args.force or + areSourceFilesNewer(layername, sources=sources, + cache=dso.GetLayerByName(lyr_cache), + cachedir=cachedir)): + logging.info('Output layer "%s" is up to date, skipping', layername) + continue validateOutputLayer(lyr, srs=srs, options=layerdef['create']) @@ -665,17 +671,28 @@ def main() -> NoReturn: lyrTransaction = False try: - clearLayer(dso, lyr) # TODO conditional (only if not new)? - description = layerdef.get('description', None) if (description is not None and lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): logging.warning('Could not set description metadata') - bChanged0 = importSources(lyr_dst=lyr, dso=dso, sources=sources, - mtimes=sourcesMtimeNS, - lyr_sourcecache=lyr_sourcecache, - cachedir=cachedir, extent=extent) + now = datetime.now().astimezone() + clearLayer(dso, lyr) # TODO conditional (only if not new)? + importSources(lyr_dst=lyr, sources=sources, + cachedir=cachedir, extent=extent) + + # force the PG driver to call EndCopy() to detect errors and trigger a + # rollback if needed + dso.FlushCache() + + if lyr_cache is None: + bChanged0 = True + else: + updateLayerCache(layername=layername, + cache=dso.GetLayerByName(lyr_cache), + last_updated=now) + bChanged0 = True # TODO fingerprint lyr to detect changes + dso.FlushCache() bChanged = bChanged or bChanged0 if isinstance(lyrTransaction, str): @@ -732,7 +749,6 @@ def main() -> NoReturn: sys.exit(1) finally: - lyr_sourcecache = None dso = None srs = None extent = None |