From e5e8a6548ef156b785aae1b8a37fe71f26146061 Mon Sep 17 00:00:00 2001 From: Guilhem Moulin Date: Sat, 19 Apr 2025 13:27:49 +0200 Subject: webmap-import: Add a cache layer and store the source file's last modification time. That way we can avoid the expensive unpack+import when the source file(s) have not been updated since the last run. The check can be bypassed with a new flag `--force`. We use a sequence for the FID:s (primary key) and a UNIQUE constraint on triplets (source_path, archive_member, layername) as GDAL doesn't support multicolumns primary keys. To avoid races between the stat(2) calls, gdal.OpenEx() and updates via `webmap-download` runs we place a shared lock on the downloaded files. One could resort to some tricks to eliminate the race between the first two, but there is also some value in having consistency during the entire execution of the script (a single source file can be used by multiple layers for instance, and it makes sense to use the very same file for all layers in that case). We also intersperse dso.FlushCache() calls between _importSource() calls in order to force the PG driver to call EndCopy() to detect errors and trigger a rollback when _importSource() fails. --- webmap-import | 361 ++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 326 insertions(+), 35 deletions(-) (limited to 'webmap-import') diff --git a/webmap-import b/webmap-import index 1171851..5cb76d3 100755 --- a/webmap-import +++ b/webmap-import @@ -20,13 +20,15 @@ # pylint: disable=invalid-name, missing-module-docstring, fixme -from os import O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC +from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY import os +from stat import S_ISREG import sys -from fcntl import flock, LOCK_EX +from fcntl import flock, LOCK_EX, LOCK_SH import logging import argparse import re +from datetime import datetime from pathlib import Path from typing import Any, Optional, NoReturn import traceback @@ -41,7 +43,12 @@ from osgeo.gdalconst import ( from osgeo import gdalconst import common -from common import BadConfiguration, escape_identifier +from common import ( + BadConfiguration, + escape_identifier, + escape_literal_str, + getSourcePathLockFileName +) from common_gdal import ( gdalVersionMin, gdalGetMetadataItem, @@ -57,7 +64,8 @@ from import_source import ( createOutputLayer, validateOutputLayer, clearLayer, - importSources + importSources, + getSourceCacheKey ) def setFieldIf(cond : bool, @@ -276,6 +284,258 @@ def validate_sources(layers : dict[str, Any]) -> None: for layername in toremove: layers.pop(layername) +def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int, + name : str, + typ : int, + subtyp : int = ogr.OFSTNone, + width : int = 0, + isNullable : Optional[bool] = None) -> bool: + """Validate field #idx from the source cache layer/table.""" + n = defn.GetFieldCount() + if idx >= n: + return False + defn = defn.GetFieldDefn(idx) + + b = True + name2 = defn.GetName() + if name2 != name: + logging.warning('Source cache layer\'s field #%d has name "%s" != "%s", ignoring cache', + idx, name2, name) + b = False + + if isNullable is not None and defn.IsNullable() != isNullable: + # non-fatal + logging.warning('Source cache layer\'s field #%d ("%s") %s nullable', + idx, name2, 'is' if defn.IsNullable() else 'is not') + + typ2 = defn.GetType() + if typ2 != typ: + logging.warning('Source cache layer\'s field #%d ("%s") has type %s != %s, ignoring cache', + idx, name2, + ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ)) + b = False + + subtyp2 = defn.GetSubType() + if subtyp2 != subtyp: + logging.warning('Source cache layer\'s field #%d ("%s") has subtype %s != %s, ' + 'ignoring cache', idx, name2, + ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp)) + b = False + + width2 = defn.GetWidth() + if width2 != 0 and (width == 0 or width2 < width): + # non-fatal + logging.warning('Source cache layer\'s field #%d ("%s") is too small (width %d < %d)', + idx, name2, width2, width) + return b + +def getSourceCacheLayer(ds : gdal.Dataset, name : str|None, + force : bool = False) -> ogr.Layer|None: + """Get the source cache layer/table and validate it.""" + if name is None: + return None + + lyr = ds.GetLayerByName(name) + if lyr is None: + if not force: # show a warning if args.force is not set + logging.warning('Table "%s" does not exist, implying --force', name) + return None + +# if not (lyr.TestCapability(ogr.OLCRandomWrite) and +# gdalVersionMin(maj=3, min=9) and +# lyr.TestCapability(ogr.OLCUpdateFeature)): +# # TODO OLCUpdateFeature was added in 3.7 but we also want to use .GetDataset() +# # which was added in 3.9 +# logging.warning('Layer "%s" does not support OLCUpdateFeature capability, ' +# 'ignoring cache', name) +# return None + + defn = lyr.GetLayerDefn() + fields = [ + { 'name': 'source_path', 'typ': ogr.OFTString, + 'isNullable': False, 'width': 2047 }, + { 'name': 'archive_member', 'typ': ogr.OFTString, + 'isNullable': False, 'width': 2047 }, + { 'name': 'layername', 'typ': ogr.OFTString, + 'isNullable': False, 'width': 255 }, + { 'name': 'mtime_ns', 'typ': ogr.OFTInteger64, + 'isNullable': True }, + ] + m = len(fields) + n = defn.GetFieldCount() + if n < m: + logging.warning('Source cache layer/table "%s" has %d < %d fields, ignoring cache', + name, n, m) + elif n != m: + logging.warning('Source cache layer/table "%s" has %d != %d fields', name, n, m) + if not all(validateSourceCacheField(defn, i, **fld) for i,fld in enumerate(fields)): + return None + + n = defn.GetGeomFieldCount() + if n > 0: + geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName()) + for i in range(n) ] + logging.warning('Source cache layer/table "%s" has %d > 0 geometry field(s): %s', + name, n, ', '.join(geomFieldNames)) + + if gdalVersionMin(maj=3, min=5): + style = lyr.GetStyleTable() + if style is not None: + logging.warning('Source cache layer/table "%s" has a style table "%s"', + name, style.GetLastStyleName()) + return lyr + +def getSourcesMtimeNS(sources : dict[str,Any], + cachedir : Optional[Path] = None) -> dict[str,int|None]: + """Return a dictionary mapping each source path to its last modification + time (in nanoseconds), or None if said source path is not a regular file.""" + mtimes_ns = {} + for source in sources: + # the same source_path can be used multiple times, stat(2) only once + source_path = source['source']['path'] + mtimes_ns[source_path] = None + for source_path in mtimes_ns: + path = source_path if cachedir is None else str(cachedir.joinpath(source_path)) + try: + st = os.stat(path) + if not S_ISREG(st.st_mode): + raise FileNotFoundError + mtimes_ns[source_path] = st.st_mtime_ns + except (OSError, ValueError): + #logging.warning('Could not stat(%s)', path) + pass + return mtimes_ns + +def isSourceCacheDirtyTime(lyr : ogr.Layer, sources : dict[str,Any], + mtimes_ns : dict[str,int|None]) -> bool: + """Return a boolean indicating whether the source cache layer/table is up to + date with respect to the dictionary mapping each source path to its last + modification time. That is, every triplet (source_path, archive_member, + layername) needs to be present in cache, and the corresponding mtime_ns + needs to match the stat(2) output of the source file on disk.""" + ks = set() + for source in sources: + source_path, archive_member, layername = k = getSourceCacheKey(source) + if k in ks: + raise BadConfiguration(f'Duplicate key {k}') + ks.add(k) + + if len(ks) == 0: + return False + + attributeFilter = [] + for source_path, archive_member, layername in ks: + attributeFilter.append('(' + escape_literal_str(source_path) + ',' + + escape_literal_str(archive_member) + ',' + + escape_literal_str(layername) + ')') + if len(attributeFilter) == 1: + attributeFilter = '= ' + attributeFilter[0] + else: + attributeFilter = 'IN (' + ','.join(attributeFilter) + ')' + attributeFilter = '(source_path,archive_member,layername) ' + attributeFilter + logging.debug('SetAttributeFilter("%s", "%s")', lyr.GetName(), attributeFilter) + lyr.SetAttributeFilter(attributeFilter) + + cache = {} + feature = lyr.GetNextFeature() + while feature is not None: + k = ( + feature.GetFieldAsString(0) if feature.IsFieldSetAndNotNull(0) else None, + feature.GetFieldAsString(1) if feature.IsFieldSetAndNotNull(1) else None, + feature.GetFieldAsString(2) if feature.IsFieldSetAndNotNull(2) else None, + ) + if any(k0 is None for k0 in k): + # no column in the the key may be NULL + raise RuntimeError(f'Bad key {k}') + if k in cache: + raise RuntimeError(f'Duplicate key {k}') + cache[k] = feature.GetFieldAsInteger64(3) if feature.IsFieldSetAndNotNull(3) else None + feature = lyr.GetNextFeature() + + for k in ks: + a = cache.get(k, None) + b = mtimes_ns[k[0]] + if a is None or b is None or a != b: + return True + + for source_path in sorted({ p for p,_,_ in ks }): + # XXX datetime.fromtimestamp() doesn't support nanosecond input + # https://github.com/python/cpython/issues/59648 + mtime = (mtimes_ns[source_path] // 1000)/1000000. + dt = datetime.fromtimestamp(mtime) + logging.info('Source file %s is unchanged (last modification time %s)', + source_path, dt.astimezone().isoformat(timespec='seconds')) + return False + +def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]: + """Place shared locks on each source path and return their respective file + descriptors. We could do that one layerdef at a time (one output layer at a + time) to reduce the time during which the sources prevented from being + updated/downloaded, but their is some value in having consistency across the + whole import process.""" + lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY) + try: + ret = {} + for layerdef in layerdefs: + for source in layerdef['sources']: + source_path = source['source']['path'] + lockfile = getSourcePathLockFileName(source_path) + lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664, + dir_fd=lockdir_fd) + logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)', + source_path, lockfile) + flock(lock_fd, LOCK_SH) + ret[source_path] = lock_fd + return ret + finally: + try: + os.close(lockdir_fd) + except (OSError, ValueError): + logging.exception('Could not close lockdir') + +def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None: + """Release shared locks on the source paths. Closed FDs are removed from + the dictionary in place.""" + toremove = set() + for path, lock_fd in lock_fds.items(): + try: + os.close(lock_fd) + except (OSError, ValueError): + logging.exception('Could not release lock for %s', path) + else: + logging.debug('Released lock for %s', path) + toremove.add(path) + for path in toremove: + lock_fds.pop(path) + +def commitOrRollbackTransactionIf(obj : gdal.Dataset|ogr.Layer, commit : bool) -> bool: + """Either commit or rollback the transaction, depending on a condition + indicating whether there are pending new changes to be committed.""" + if commit: + logging.debug('Committing transaction') + if obj.CommitTransaction() == ogr.OGRERR_NONE: + return True + logging.error('Could not commit transaction') + else: + logging.info('No changes detected, rolling back transaction') + # don't call rollbackTransactionTry() as we don't want to catch exceptions + if obj.RollbackTransaction() == ogr.OGRERR_NONE: + return True + logging.error('Could not rollback transaction') + return False + +def rollbackTransactionTry(obj : gdal.Dataset|ogr.Layer) -> bool: + """Try to rollback the current transaction on the dataset or layer, ignoring + any exception but logging errors and returning a boolean indicating + success.""" + try: + if obj.RollbackTransaction() == ogr.OGRERR_NONE: + return True + logging.error('Could not rollback transaction') + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not rollback transaction') + return False + # pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements def main() -> NoReturn: common.init_logger(app=os.path.basename(__file__), level=logging.INFO) @@ -286,7 +546,11 @@ def main() -> NoReturn: parser.add_argument('--debug', action='count', default=0, help=argparse.SUPPRESS) parser.add_argument('--lockfile', default=None, - help='obtain an exclusive lock before starting unpacking and importing') + help='obtain an exclusive lock before processing') + parser.add_argument('--lockdir-sources', default=None, + help='optional directory for lock files to sources paths') + parser.add_argument('--force', default=False, action='store_true', + help='import even if no new changes were detected') parser.add_argument('groupname', nargs='*', help='group layer name(s) to process') args = parser.parse_args() @@ -313,8 +577,8 @@ def main() -> NoReturn: dso = openOutputDS(config['dataset']) validate_schema(layers, - drvo=dso.GetDriver(), - lco_defaults=config['dataset'].get('create-layer-options', None)) + drvo=dso.GetDriver(), + lco_defaults=config['dataset'].get('create-layer-options', None)) # get configured Spatial Reference System and extent srs = getSRS(config.get('SRS', None)) @@ -340,6 +604,11 @@ def main() -> NoReturn: createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None)) cachedir = Path(args.cachedir) if args.cachedir is not None else None + if args.lockdir_sources is None: + sourcePathLocks = None + else: + sourcePathLocks = lockSourcePaths(layerdefs=layers.values(), + lockdir=args.lockdir_sources) if (dso.TestCapability(ogr.ODsCTransactions) and dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')): @@ -350,6 +619,12 @@ def main() -> NoReturn: dso.GetDriver().ShortName) dsoTransaction = False + # get source cache layer/table + lyr_sourcecache = getSourceCacheLayer(dso, + config['dataset'].get('sourcecache-layername', None), + force=args.force) + + bChanged = False rv = 0 try: for layername, layerdef in layers.items(): @@ -360,9 +635,18 @@ def main() -> NoReturn: if not lyr.TestCapability(ogr.OLCSequentialWrite): raise RuntimeError(f'Output layer "{layername}" has no working ' 'CreateFeature() method') - validateOutputLayer(lyr, srs=srs, options=layerdef['create']) sources = layerdef['sources'] + if lyr_sourcecache is None: + sourcesMtimeNS = None + else: + sourcesMtimeNS = getSourcesMtimeNS(sources, cachedir=cachedir) + if not (args.force or isSourceCacheDirtyTime(lyr_sourcecache, sources, + mtimes_ns=sourcesMtimeNS)): + logging.info('Output layer "%s" is up to date, skipping', layername) + continue + + validateOutputLayer(lyr, srs=srs, options=layerdef['create']) # setup output field mapping in the sources dictionary setOutputFieldMap(lyr.GetLayerDefn(), sources) @@ -381,36 +665,44 @@ def main() -> NoReturn: lyrTransaction = False try: - clearLayer(dso, lyr) + clearLayer(dso, lyr) # TODO conditional (only if not new)? description = layerdef.get('description', None) if (description is not None and lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): logging.warning('Could not set description metadata') - importSources(lyr, sources=sources, cachedir=cachedir, extent=extent) + bChanged0 = importSources(lyr_dst=lyr, dso=dso, sources=sources, + mtimes=sourcesMtimeNS, + lyr_sourcecache=lyr_sourcecache, + cachedir=cachedir, extent=extent) + bChanged = bChanged or bChanged0 - if isinstance(lyrTransaction, bool) and lyrTransaction: - # commit transaction - logging.debug('Committing transaction') + if isinstance(lyrTransaction, str): + if not (bChanged0 or args.force): + logging.info('No changes detected, rolling back to previous savepoint') + query = 'ROLLBACK TO ' + lyrTransaction + logging.debug(query) + try: + dso.ExecuteSQL(query) + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not %s', query) + rv = 1 + elif isinstance(lyrTransaction, bool) and lyrTransaction: lyrTransaction = False - if lyr.CommitTransaction() != ogr.OGRERR_NONE: - logging.error('Could not commit transaction') + if not commitOrRollbackTransactionIf(lyr, commit=bChanged0 or args.force): rv = 1 except Exception: # pylint: disable=broad-exception-caught if isinstance(lyrTransaction, str): query = 'ROLLBACK TO ' + lyrTransaction - logging.exception('Exception occured in transaction, %s', query) + logging.exception('Exception occured within transaction, ' + 'rolling back to previous savepoint') logging.debug(query) dso.ExecuteSQL(query) elif isinstance(lyrTransaction, bool) and lyrTransaction: - logging.exception('Exception occured in transaction, rolling back') - try: - if lyr.RollbackTransaction() != ogr.OGRERR_NONE: - logging.error('Could not rollback transaction') - except RuntimeError: - logging.exception('Could not rollback transaction') + logging.exception('Exception occured within transaction, rolling back') + rollbackTransactionTry(lyr) else: traceback.print_exc() rv = 1 @@ -422,29 +714,28 @@ def main() -> NoReturn: logging.debug(query) dso.ExecuteSQL(query) + if sourcePathLocks is not None: + releaseSourcePathLocks(sourcePathLocks) + if dsoTransaction: # commit transaction - logging.debug('Committing transaction') dsoTransaction = False - if dso.CommitTransaction() != ogr.OGRERR_NONE: - logging.error('Could not commit transaction') + if not commitOrRollbackTransactionIf(dso, commit=bChanged or args.force): rv = 1 except Exception: # pylint: disable=broad-exception-caught if dsoTransaction: - logging.exception('Exception occured in transaction, rolling back') - try: - if dso.RollbackTransaction() != ogr.OGRERR_NONE: - logging.error('Could not rollback transaction') - except RuntimeError: - logging.exception('Could not rollback transaction') + logging.exception('Exception occured within transaction, rolling back') + rollbackTransactionTry(lyr) else: traceback.print_exc() - rv = 1 + sys.exit(1) - dso = None - srs = None - extent = None + finally: + lyr_sourcecache = None + dso = None + srs = None + extent = None sys.exit(rv) gdal.UseExceptions() -- cgit v1.2.3