diff options
Diffstat (limited to 'webmap-import')
-rwxr-xr-x | webmap-import | 361 |
1 files changed, 326 insertions, 35 deletions
diff --git a/webmap-import b/webmap-import index 1171851..5cb76d3 100755 --- a/webmap-import +++ b/webmap-import @@ -20,13 +20,15 @@ # pylint: disable=invalid-name, missing-module-docstring, fixme -from os import O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC +from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY import os +from stat import S_ISREG import sys -from fcntl import flock, LOCK_EX +from fcntl import flock, LOCK_EX, LOCK_SH import logging import argparse import re +from datetime import datetime from pathlib import Path from typing import Any, Optional, NoReturn import traceback @@ -41,7 +43,12 @@ from osgeo.gdalconst import ( from osgeo import gdalconst import common -from common import BadConfiguration, escape_identifier +from common import ( + BadConfiguration, + escape_identifier, + escape_literal_str, + getSourcePathLockFileName +) from common_gdal import ( gdalVersionMin, gdalGetMetadataItem, @@ -57,7 +64,8 @@ from import_source import ( createOutputLayer, validateOutputLayer, clearLayer, - importSources + importSources, + getSourceCacheKey ) def setFieldIf(cond : bool, @@ -276,6 +284,258 @@ def validate_sources(layers : dict[str, Any]) -> None: for layername in toremove: layers.pop(layername) +def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int, + name : str, + typ : int, + subtyp : int = ogr.OFSTNone, + width : int = 0, + isNullable : Optional[bool] = None) -> bool: + """Validate field #idx from the source cache layer/table.""" + n = defn.GetFieldCount() + if idx >= n: + return False + defn = defn.GetFieldDefn(idx) + + b = True + name2 = defn.GetName() + if name2 != name: + logging.warning('Source cache layer\'s field #%d has name "%s" != "%s", ignoring cache', + idx, name2, name) + b = False + + if isNullable is not None and defn.IsNullable() != isNullable: + # non-fatal + logging.warning('Source cache layer\'s field #%d ("%s") %s nullable', + idx, name2, 'is' if defn.IsNullable() else 'is not') + + typ2 = defn.GetType() + if typ2 != typ: + logging.warning('Source cache layer\'s field #%d ("%s") has type %s != %s, ignoring cache', + idx, name2, + ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ)) + b = False + + subtyp2 = defn.GetSubType() + if subtyp2 != subtyp: + logging.warning('Source cache layer\'s field #%d ("%s") has subtype %s != %s, ' + 'ignoring cache', idx, name2, + ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp)) + b = False + + width2 = defn.GetWidth() + if width2 != 0 and (width == 0 or width2 < width): + # non-fatal + logging.warning('Source cache layer\'s field #%d ("%s") is too small (width %d < %d)', + idx, name2, width2, width) + return b + +def getSourceCacheLayer(ds : gdal.Dataset, name : str|None, + force : bool = False) -> ogr.Layer|None: + """Get the source cache layer/table and validate it.""" + if name is None: + return None + + lyr = ds.GetLayerByName(name) + if lyr is None: + if not force: # show a warning if args.force is not set + logging.warning('Table "%s" does not exist, implying --force', name) + return None + +# if not (lyr.TestCapability(ogr.OLCRandomWrite) and +# gdalVersionMin(maj=3, min=9) and +# lyr.TestCapability(ogr.OLCUpdateFeature)): +# # TODO OLCUpdateFeature was added in 3.7 but we also want to use .GetDataset() +# # which was added in 3.9 +# logging.warning('Layer "%s" does not support OLCUpdateFeature capability, ' +# 'ignoring cache', name) +# return None + + defn = lyr.GetLayerDefn() + fields = [ + { 'name': 'source_path', 'typ': ogr.OFTString, + 'isNullable': False, 'width': 2047 }, + { 'name': 'archive_member', 'typ': ogr.OFTString, + 'isNullable': False, 'width': 2047 }, + { 'name': 'layername', 'typ': ogr.OFTString, + 'isNullable': False, 'width': 255 }, + { 'name': 'mtime_ns', 'typ': ogr.OFTInteger64, + 'isNullable': True }, + ] + m = len(fields) + n = defn.GetFieldCount() + if n < m: + logging.warning('Source cache layer/table "%s" has %d < %d fields, ignoring cache', + name, n, m) + elif n != m: + logging.warning('Source cache layer/table "%s" has %d != %d fields', name, n, m) + if not all(validateSourceCacheField(defn, i, **fld) for i,fld in enumerate(fields)): + return None + + n = defn.GetGeomFieldCount() + if n > 0: + geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName()) + for i in range(n) ] + logging.warning('Source cache layer/table "%s" has %d > 0 geometry field(s): %s', + name, n, ', '.join(geomFieldNames)) + + if gdalVersionMin(maj=3, min=5): + style = lyr.GetStyleTable() + if style is not None: + logging.warning('Source cache layer/table "%s" has a style table "%s"', + name, style.GetLastStyleName()) + return lyr + +def getSourcesMtimeNS(sources : dict[str,Any], + cachedir : Optional[Path] = None) -> dict[str,int|None]: + """Return a dictionary mapping each source path to its last modification + time (in nanoseconds), or None if said source path is not a regular file.""" + mtimes_ns = {} + for source in sources: + # the same source_path can be used multiple times, stat(2) only once + source_path = source['source']['path'] + mtimes_ns[source_path] = None + for source_path in mtimes_ns: + path = source_path if cachedir is None else str(cachedir.joinpath(source_path)) + try: + st = os.stat(path) + if not S_ISREG(st.st_mode): + raise FileNotFoundError + mtimes_ns[source_path] = st.st_mtime_ns + except (OSError, ValueError): + #logging.warning('Could not stat(%s)', path) + pass + return mtimes_ns + +def isSourceCacheDirtyTime(lyr : ogr.Layer, sources : dict[str,Any], + mtimes_ns : dict[str,int|None]) -> bool: + """Return a boolean indicating whether the source cache layer/table is up to + date with respect to the dictionary mapping each source path to its last + modification time. That is, every triplet (source_path, archive_member, + layername) needs to be present in cache, and the corresponding mtime_ns + needs to match the stat(2) output of the source file on disk.""" + ks = set() + for source in sources: + source_path, archive_member, layername = k = getSourceCacheKey(source) + if k in ks: + raise BadConfiguration(f'Duplicate key {k}') + ks.add(k) + + if len(ks) == 0: + return False + + attributeFilter = [] + for source_path, archive_member, layername in ks: + attributeFilter.append('(' + escape_literal_str(source_path) + ',' + + escape_literal_str(archive_member) + ',' + + escape_literal_str(layername) + ')') + if len(attributeFilter) == 1: + attributeFilter = '= ' + attributeFilter[0] + else: + attributeFilter = 'IN (' + ','.join(attributeFilter) + ')' + attributeFilter = '(source_path,archive_member,layername) ' + attributeFilter + logging.debug('SetAttributeFilter("%s", "%s")', lyr.GetName(), attributeFilter) + lyr.SetAttributeFilter(attributeFilter) + + cache = {} + feature = lyr.GetNextFeature() + while feature is not None: + k = ( + feature.GetFieldAsString(0) if feature.IsFieldSetAndNotNull(0) else None, + feature.GetFieldAsString(1) if feature.IsFieldSetAndNotNull(1) else None, + feature.GetFieldAsString(2) if feature.IsFieldSetAndNotNull(2) else None, + ) + if any(k0 is None for k0 in k): + # no column in the the key may be NULL + raise RuntimeError(f'Bad key {k}') + if k in cache: + raise RuntimeError(f'Duplicate key {k}') + cache[k] = feature.GetFieldAsInteger64(3) if feature.IsFieldSetAndNotNull(3) else None + feature = lyr.GetNextFeature() + + for k in ks: + a = cache.get(k, None) + b = mtimes_ns[k[0]] + if a is None or b is None or a != b: + return True + + for source_path in sorted({ p for p,_,_ in ks }): + # XXX datetime.fromtimestamp() doesn't support nanosecond input + # https://github.com/python/cpython/issues/59648 + mtime = (mtimes_ns[source_path] // 1000)/1000000. + dt = datetime.fromtimestamp(mtime) + logging.info('Source file %s is unchanged (last modification time %s)', + source_path, dt.astimezone().isoformat(timespec='seconds')) + return False + +def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]: + """Place shared locks on each source path and return their respective file + descriptors. We could do that one layerdef at a time (one output layer at a + time) to reduce the time during which the sources prevented from being + updated/downloaded, but their is some value in having consistency across the + whole import process.""" + lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY) + try: + ret = {} + for layerdef in layerdefs: + for source in layerdef['sources']: + source_path = source['source']['path'] + lockfile = getSourcePathLockFileName(source_path) + lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664, + dir_fd=lockdir_fd) + logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)', + source_path, lockfile) + flock(lock_fd, LOCK_SH) + ret[source_path] = lock_fd + return ret + finally: + try: + os.close(lockdir_fd) + except (OSError, ValueError): + logging.exception('Could not close lockdir') + +def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None: + """Release shared locks on the source paths. Closed FDs are removed from + the dictionary in place.""" + toremove = set() + for path, lock_fd in lock_fds.items(): + try: + os.close(lock_fd) + except (OSError, ValueError): + logging.exception('Could not release lock for %s', path) + else: + logging.debug('Released lock for %s', path) + toremove.add(path) + for path in toremove: + lock_fds.pop(path) + +def commitOrRollbackTransactionIf(obj : gdal.Dataset|ogr.Layer, commit : bool) -> bool: + """Either commit or rollback the transaction, depending on a condition + indicating whether there are pending new changes to be committed.""" + if commit: + logging.debug('Committing transaction') + if obj.CommitTransaction() == ogr.OGRERR_NONE: + return True + logging.error('Could not commit transaction') + else: + logging.info('No changes detected, rolling back transaction') + # don't call rollbackTransactionTry() as we don't want to catch exceptions + if obj.RollbackTransaction() == ogr.OGRERR_NONE: + return True + logging.error('Could not rollback transaction') + return False + +def rollbackTransactionTry(obj : gdal.Dataset|ogr.Layer) -> bool: + """Try to rollback the current transaction on the dataset or layer, ignoring + any exception but logging errors and returning a boolean indicating + success.""" + try: + if obj.RollbackTransaction() == ogr.OGRERR_NONE: + return True + logging.error('Could not rollback transaction') + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not rollback transaction') + return False + # pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements def main() -> NoReturn: common.init_logger(app=os.path.basename(__file__), level=logging.INFO) @@ -286,7 +546,11 @@ def main() -> NoReturn: parser.add_argument('--debug', action='count', default=0, help=argparse.SUPPRESS) parser.add_argument('--lockfile', default=None, - help='obtain an exclusive lock before starting unpacking and importing') + help='obtain an exclusive lock before processing') + parser.add_argument('--lockdir-sources', default=None, + help='optional directory for lock files to sources paths') + parser.add_argument('--force', default=False, action='store_true', + help='import even if no new changes were detected') parser.add_argument('groupname', nargs='*', help='group layer name(s) to process') args = parser.parse_args() @@ -313,8 +577,8 @@ def main() -> NoReturn: dso = openOutputDS(config['dataset']) validate_schema(layers, - drvo=dso.GetDriver(), - lco_defaults=config['dataset'].get('create-layer-options', None)) + drvo=dso.GetDriver(), + lco_defaults=config['dataset'].get('create-layer-options', None)) # get configured Spatial Reference System and extent srs = getSRS(config.get('SRS', None)) @@ -340,6 +604,11 @@ def main() -> NoReturn: createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None)) cachedir = Path(args.cachedir) if args.cachedir is not None else None + if args.lockdir_sources is None: + sourcePathLocks = None + else: + sourcePathLocks = lockSourcePaths(layerdefs=layers.values(), + lockdir=args.lockdir_sources) if (dso.TestCapability(ogr.ODsCTransactions) and dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')): @@ -350,6 +619,12 @@ def main() -> NoReturn: dso.GetDriver().ShortName) dsoTransaction = False + # get source cache layer/table + lyr_sourcecache = getSourceCacheLayer(dso, + config['dataset'].get('sourcecache-layername', None), + force=args.force) + + bChanged = False rv = 0 try: for layername, layerdef in layers.items(): @@ -360,9 +635,18 @@ def main() -> NoReturn: if not lyr.TestCapability(ogr.OLCSequentialWrite): raise RuntimeError(f'Output layer "{layername}" has no working ' 'CreateFeature() method') - validateOutputLayer(lyr, srs=srs, options=layerdef['create']) sources = layerdef['sources'] + if lyr_sourcecache is None: + sourcesMtimeNS = None + else: + sourcesMtimeNS = getSourcesMtimeNS(sources, cachedir=cachedir) + if not (args.force or isSourceCacheDirtyTime(lyr_sourcecache, sources, + mtimes_ns=sourcesMtimeNS)): + logging.info('Output layer "%s" is up to date, skipping', layername) + continue + + validateOutputLayer(lyr, srs=srs, options=layerdef['create']) # setup output field mapping in the sources dictionary setOutputFieldMap(lyr.GetLayerDefn(), sources) @@ -381,36 +665,44 @@ def main() -> NoReturn: lyrTransaction = False try: - clearLayer(dso, lyr) + clearLayer(dso, lyr) # TODO conditional (only if not new)? description = layerdef.get('description', None) if (description is not None and lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): logging.warning('Could not set description metadata') - importSources(lyr, sources=sources, cachedir=cachedir, extent=extent) + bChanged0 = importSources(lyr_dst=lyr, dso=dso, sources=sources, + mtimes=sourcesMtimeNS, + lyr_sourcecache=lyr_sourcecache, + cachedir=cachedir, extent=extent) + bChanged = bChanged or bChanged0 - if isinstance(lyrTransaction, bool) and lyrTransaction: - # commit transaction - logging.debug('Committing transaction') + if isinstance(lyrTransaction, str): + if not (bChanged0 or args.force): + logging.info('No changes detected, rolling back to previous savepoint') + query = 'ROLLBACK TO ' + lyrTransaction + logging.debug(query) + try: + dso.ExecuteSQL(query) + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not %s', query) + rv = 1 + elif isinstance(lyrTransaction, bool) and lyrTransaction: lyrTransaction = False - if lyr.CommitTransaction() != ogr.OGRERR_NONE: - logging.error('Could not commit transaction') + if not commitOrRollbackTransactionIf(lyr, commit=bChanged0 or args.force): rv = 1 except Exception: # pylint: disable=broad-exception-caught if isinstance(lyrTransaction, str): query = 'ROLLBACK TO ' + lyrTransaction - logging.exception('Exception occured in transaction, %s', query) + logging.exception('Exception occured within transaction, ' + 'rolling back to previous savepoint') logging.debug(query) dso.ExecuteSQL(query) elif isinstance(lyrTransaction, bool) and lyrTransaction: - logging.exception('Exception occured in transaction, rolling back') - try: - if lyr.RollbackTransaction() != ogr.OGRERR_NONE: - logging.error('Could not rollback transaction') - except RuntimeError: - logging.exception('Could not rollback transaction') + logging.exception('Exception occured within transaction, rolling back') + rollbackTransactionTry(lyr) else: traceback.print_exc() rv = 1 @@ -422,29 +714,28 @@ def main() -> NoReturn: logging.debug(query) dso.ExecuteSQL(query) + if sourcePathLocks is not None: + releaseSourcePathLocks(sourcePathLocks) + if dsoTransaction: # commit transaction - logging.debug('Committing transaction') dsoTransaction = False - if dso.CommitTransaction() != ogr.OGRERR_NONE: - logging.error('Could not commit transaction') + if not commitOrRollbackTransactionIf(dso, commit=bChanged or args.force): rv = 1 except Exception: # pylint: disable=broad-exception-caught if dsoTransaction: - logging.exception('Exception occured in transaction, rolling back') - try: - if dso.RollbackTransaction() != ogr.OGRERR_NONE: - logging.error('Could not rollback transaction') - except RuntimeError: - logging.exception('Could not rollback transaction') + logging.exception('Exception occured within transaction, rolling back') + rollbackTransactionTry(lyr) else: traceback.print_exc() - rv = 1 + sys.exit(1) - dso = None - srs = None - extent = None + finally: + lyr_sourcecache = None + dso = None + srs = None + extent = None sys.exit(rv) gdal.UseExceptions() |