aboutsummaryrefslogtreecommitdiffstats
path: root/import_source.py
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-04-19 13:27:49 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-04-23 12:09:24 +0200
commite5e8a6548ef156b785aae1b8a37fe71f26146061 (patch)
treeff774b2dbccb133f0f75d4731de9e302dfcc59bf /import_source.py
parentc33799f69e7eb42cb0ab4735c7e878d74faca16a (diff)
webmap-import: Add a cache layer and store the source file's last modification time.
That way we can avoid the expensive unpack+import when the source file(s) have not been updated since the last run. The check can be bypassed with a new flag `--force`. We use a sequence for the FID:s (primary key) and a UNIQUE constraint on triplets (source_path, archive_member, layername) as GDAL doesn't support multicolumns primary keys. To avoid races between the stat(2) calls, gdal.OpenEx() and updates via `webmap-download` runs we place a shared lock on the downloaded files. One could resort to some tricks to eliminate the race between the first two, but there is also some value in having consistency during the entire execution of the script (a single source file can be used by multiple layers for instance, and it makes sense to use the very same file for all layers in that case). We also intersperse dso.FlushCache() calls between _importSource() calls in order to force the PG driver to call EndCopy() to detect errors and trigger a rollback when _importSource() fails.
Diffstat (limited to 'import_source.py')
-rw-r--r--import_source.py69
1 files changed, 66 insertions, 3 deletions
diff --git a/import_source.py b/import_source.py
index e30a245..1fa754c 100644
--- a/import_source.py
+++ b/import_source.py
@@ -37,7 +37,7 @@ from osgeo.gdalconst import (
)
from osgeo import gdalconst
-from common import BadConfiguration, escape_identifier
+from common import BadConfiguration, escape_identifier, escape_literal_str
from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, formatTZFlag
def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset:
@@ -431,18 +431,81 @@ def listArchiveMembers(namelist : list[str],
logging.debug('Ignoring archive member %s', name)
return members
+def getSourceCacheKey(source : dict[str,Any]) -> tuple[str,str,str]:
+ """Get the unique triplet (source_path, archive_member, layername)
+ for the layer source which can be used for attribute filters and
+ lookups."""
+ importdef = source['import']
+ sourcedef = source['source']
+ source_path = sourcedef['path']
+ archive_member = importdef['path'] if sourcedef.get('unar', None) is not None else ''
+ layername = importdef.get('layername', '')
+ return (source_path, archive_member, layername)
+
# pylint: disable-next=dangerous-default-value
def importSources(lyr_dst : ogr.Layer,
+ dso : gdal.Dataset,
sources : dict[str,Any] = {},
cachedir : Path|None = None,
- extent : ogr.Geometry|None = None) -> None:
- """Import source layers"""
+ mtimes : dict[str, int|None]|None = None,
+ lyr_sourcecache : ogr.Layer|None = None,
+ extent : ogr.Geometry|None = None) -> bool:
+ """Import source layers."""
+ bChanged = False
for source in sources:
_importSource(lyr_dst, **source['source'],
args=source['import'],
cachedir=cachedir,
extent=extent)
+ # force the PG driver to call EndCopy() to detect errors and trigger a rollback if needed
+ # TODO [3.9] use lyr_dst.GetDataset().FlushCache() instead
+ dso.FlushCache()
+
+ if lyr_sourcecache is None:
+ bChanged = True
+ continue
+
+ source_path, archive_member, layername = getSourceCacheKey(source)
+ attributeFilter = ('(source_path,archive_member,layername) = ('
+ + escape_literal_str(source_path) + ','
+ + escape_literal_str(archive_member) + ','
+ + escape_literal_str(layername) + ')')
+ lyr_sourcecache.SetAttributeFilter(attributeFilter)
+
+ feature = lyr_sourcecache.GetNextFeature()
+ if feature is not None:
+ logging.debug('Updating existing feature in source cache for %s', attributeFilter)
+ update = True
+ assert lyr_sourcecache.GetNextFeature() is None
+ else:
+ logging.debug('Creating new feature in source cache for %s', attributeFilter)
+ update = False
+ feature = ogr.Feature(lyr_sourcecache.GetLayerDefn())
+ feature.SetFieldString(0, source_path)
+ feature.SetFieldString(1, archive_member)
+ feature.SetFieldString(2, layername)
+
+ mtime_ns = mtimes[source_path]
+ if mtime_ns is None:
+ feature.SetFieldNull(3)
+ else:
+ feature.SetFieldInteger64(3, mtime_ns)
+
+ if update:
+ # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead
+ if lyr_sourcecache.SetFeature(feature) != ogr.OGRERR_NONE:
+ raise RuntimeError('Could not update feature in sourcecache')
+ else:
+ if lyr_sourcecache.CreateFeature(feature) != ogr.OGRERR_NONE:
+ raise RuntimeError('Could not create new feature in sourcecache')
+
+ # TODO [3.9] use lyr_sourcecache.GetDataset().FlushCache() instead
+ dso.FlushCache()
+ bChanged = True # TODO fingerprint the features to detect changes
+
+ return bChanged
+
# pylint: disable-next=dangerous-default-value
def _importSource(lyr : ogr.Layer,
path : str = '/nonexistent',