From e5e8a6548ef156b785aae1b8a37fe71f26146061 Mon Sep 17 00:00:00 2001 From: Guilhem Moulin Date: Sat, 19 Apr 2025 13:27:49 +0200 Subject: webmap-import: Add a cache layer and store the source file's last modification time. That way we can avoid the expensive unpack+import when the source file(s) have not been updated since the last run. The check can be bypassed with a new flag `--force`. We use a sequence for the FID:s (primary key) and a UNIQUE constraint on triplets (source_path, archive_member, layername) as GDAL doesn't support multicolumns primary keys. To avoid races between the stat(2) calls, gdal.OpenEx() and updates via `webmap-download` runs we place a shared lock on the downloaded files. One could resort to some tricks to eliminate the race between the first two, but there is also some value in having consistency during the entire execution of the script (a single source file can be used by multiple layers for instance, and it makes sense to use the very same file for all layers in that case). We also intersperse dso.FlushCache() calls between _importSource() calls in order to force the PG driver to call EndCopy() to detect errors and trigger a rollback when _importSource() fails. --- import_source.py | 69 +++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 3 deletions(-) (limited to 'import_source.py') diff --git a/import_source.py b/import_source.py index e30a245..1fa754c 100644 --- a/import_source.py +++ b/import_source.py @@ -37,7 +37,7 @@ from osgeo.gdalconst import ( ) from osgeo import gdalconst -from common import BadConfiguration, escape_identifier +from common import BadConfiguration, escape_identifier, escape_literal_str from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, formatTZFlag def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset: @@ -431,18 +431,81 @@ def listArchiveMembers(namelist : list[str], logging.debug('Ignoring archive member %s', name) return members +def getSourceCacheKey(source : dict[str,Any]) -> tuple[str,str,str]: + """Get the unique triplet (source_path, archive_member, layername) + for the layer source which can be used for attribute filters and + lookups.""" + importdef = source['import'] + sourcedef = source['source'] + source_path = sourcedef['path'] + archive_member = importdef['path'] if sourcedef.get('unar', None) is not None else '' + layername = importdef.get('layername', '') + return (source_path, archive_member, layername) + # pylint: disable-next=dangerous-default-value def importSources(lyr_dst : ogr.Layer, + dso : gdal.Dataset, sources : dict[str,Any] = {}, cachedir : Path|None = None, - extent : ogr.Geometry|None = None) -> None: - """Import source layers""" + mtimes : dict[str, int|None]|None = None, + lyr_sourcecache : ogr.Layer|None = None, + extent : ogr.Geometry|None = None) -> bool: + """Import source layers.""" + bChanged = False for source in sources: _importSource(lyr_dst, **source['source'], args=source['import'], cachedir=cachedir, extent=extent) + # force the PG driver to call EndCopy() to detect errors and trigger a rollback if needed + # TODO [3.9] use lyr_dst.GetDataset().FlushCache() instead + dso.FlushCache() + + if lyr_sourcecache is None: + bChanged = True + continue + + source_path, archive_member, layername = getSourceCacheKey(source) + attributeFilter = ('(source_path,archive_member,layername) = (' + + escape_literal_str(source_path) + ',' + + escape_literal_str(archive_member) + ',' + + escape_literal_str(layername) + ')') + lyr_sourcecache.SetAttributeFilter(attributeFilter) + + feature = lyr_sourcecache.GetNextFeature() + if feature is not None: + logging.debug('Updating existing feature in source cache for %s', attributeFilter) + update = True + assert lyr_sourcecache.GetNextFeature() is None + else: + logging.debug('Creating new feature in source cache for %s', attributeFilter) + update = False + feature = ogr.Feature(lyr_sourcecache.GetLayerDefn()) + feature.SetFieldString(0, source_path) + feature.SetFieldString(1, archive_member) + feature.SetFieldString(2, layername) + + mtime_ns = mtimes[source_path] + if mtime_ns is None: + feature.SetFieldNull(3) + else: + feature.SetFieldInteger64(3, mtime_ns) + + if update: + # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead + if lyr_sourcecache.SetFeature(feature) != ogr.OGRERR_NONE: + raise RuntimeError('Could not update feature in sourcecache') + else: + if lyr_sourcecache.CreateFeature(feature) != ogr.OGRERR_NONE: + raise RuntimeError('Could not create new feature in sourcecache') + + # TODO [3.9] use lyr_sourcecache.GetDataset().FlushCache() instead + dso.FlushCache() + bChanged = True # TODO fingerprint the features to detect changes + + return bChanged + # pylint: disable-next=dangerous-default-value def _importSource(lyr : ogr.Layer, path : str = '/nonexistent', -- cgit v1.2.3