diff options
author | Guilhem Moulin <guilhem@fripost.org> | 2025-04-23 17:55:57 +0200 |
---|---|---|
committer | Guilhem Moulin <guilhem@fripost.org> | 2025-04-24 16:49:37 +0200 |
commit | 80882acd6ba407847fed0ef308e440b88796e0e1 (patch) | |
tree | 7084e4508c9ad3aa72c034735e8cbc85738af93c /import_source.py | |
parent | c42245fc566eab01dd2e1f4b07bcbef7432c89c1 (diff) |
Change layer cache logic to target destination layers rather than sources.
In a future commit we'll fingerprint layers to detect changes.
Comparing modification times is not enough since some sources (for
instance Naturvårdsverket's SCI_Rikstackande) are updated on the server
even though no objects are being added; the source layer remains
unchanged but the file differs because of OBJECTID changes we are not
interested in.
Rather than using another cache layer/table for fingerprints, we cache
destination layernames rather than triplets (source_path, archive_member,
layername), along with the time at which the import was started rather
than source_path's mtime.
There is indeed no value in having exact source_path's mtime in the
cache. What we need is simply a way to detect whether source paths have
been updated in a subsequent run. Thanks to the shared locks the ctime
of any updated source path will be at least the time when the locks are
released, thereby exceeding the last_updated value.
Diffstat (limited to 'import_source.py')
-rw-r--r-- | import_source.py | 118 |
1 files changed, 52 insertions, 66 deletions
diff --git a/import_source.py b/import_source.py index 1fa754c..8ce69a9 100644 --- a/import_source.py +++ b/import_source.py @@ -25,6 +25,7 @@ import tempfile import re from fnmatch import fnmatchcase from pathlib import Path +from datetime import datetime, timedelta, UTC from typing import Any, Optional from osgeo import gdal, ogr, osr @@ -38,7 +39,7 @@ from osgeo.gdalconst import ( from osgeo import gdalconst from common import BadConfiguration, escape_identifier, escape_literal_str -from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, formatTZFlag +from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, gdalVersionMin, formatTZFlag def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset: """Open and return the output DS. It is created if create=False or @@ -431,81 +432,18 @@ def listArchiveMembers(namelist : list[str], logging.debug('Ignoring archive member %s', name) return members -def getSourceCacheKey(source : dict[str,Any]) -> tuple[str,str,str]: - """Get the unique triplet (source_path, archive_member, layername) - for the layer source which can be used for attribute filters and - lookups.""" - importdef = source['import'] - sourcedef = source['source'] - source_path = sourcedef['path'] - archive_member = importdef['path'] if sourcedef.get('unar', None) is not None else '' - layername = importdef.get('layername', '') - return (source_path, archive_member, layername) - # pylint: disable-next=dangerous-default-value def importSources(lyr_dst : ogr.Layer, - dso : gdal.Dataset, sources : dict[str,Any] = {}, cachedir : Path|None = None, - mtimes : dict[str, int|None]|None = None, - lyr_sourcecache : ogr.Layer|None = None, - extent : ogr.Geometry|None = None) -> bool: - """Import source layers.""" - bChanged = False + extent : ogr.Geometry|None = None) -> None: + """Import source layers to lyr_dst.""" for source in sources: _importSource(lyr_dst, **source['source'], args=source['import'], cachedir=cachedir, extent=extent) - # force the PG driver to call EndCopy() to detect errors and trigger a rollback if needed - # TODO [3.9] use lyr_dst.GetDataset().FlushCache() instead - dso.FlushCache() - - if lyr_sourcecache is None: - bChanged = True - continue - - source_path, archive_member, layername = getSourceCacheKey(source) - attributeFilter = ('(source_path,archive_member,layername) = (' - + escape_literal_str(source_path) + ',' - + escape_literal_str(archive_member) + ',' - + escape_literal_str(layername) + ')') - lyr_sourcecache.SetAttributeFilter(attributeFilter) - - feature = lyr_sourcecache.GetNextFeature() - if feature is not None: - logging.debug('Updating existing feature in source cache for %s', attributeFilter) - update = True - assert lyr_sourcecache.GetNextFeature() is None - else: - logging.debug('Creating new feature in source cache for %s', attributeFilter) - update = False - feature = ogr.Feature(lyr_sourcecache.GetLayerDefn()) - feature.SetFieldString(0, source_path) - feature.SetFieldString(1, archive_member) - feature.SetFieldString(2, layername) - - mtime_ns = mtimes[source_path] - if mtime_ns is None: - feature.SetFieldNull(3) - else: - feature.SetFieldInteger64(3, mtime_ns) - - if update: - # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead - if lyr_sourcecache.SetFeature(feature) != ogr.OGRERR_NONE: - raise RuntimeError('Could not update feature in sourcecache') - else: - if lyr_sourcecache.CreateFeature(feature) != ogr.OGRERR_NONE: - raise RuntimeError('Could not create new feature in sourcecache') - - # TODO [3.9] use lyr_sourcecache.GetDataset().FlushCache() instead - dso.FlushCache() - bChanged = True # TODO fingerprint the features to detect changes - - return bChanged - # pylint: disable-next=dangerous-default-value def _importSource(lyr : ogr.Layer, path : str = '/nonexistent', @@ -830,3 +768,51 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], for t,n in sorted(mismatch.items(), key=lambda x: x[1]) ] logging.info('Forced conversion to %s: %s', ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches)) + +def updateLayerCache(layername : str, cache : ogr.Layer, + last_updated : datetime) -> None: + """Update attributes in the layer cache for the given layer name.""" + attributeFilter = 'layername = ' + escape_literal_str(layername) + logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter) + cache.SetAttributeFilter(attributeFilter) + + feature = cache.GetNextFeature() + if feature is None: + # not in cache + logging.debug('Creating new feature in layer cache for %s', attributeFilter) + update = False + feature = ogr.Feature(cache.GetLayerDefn()) + feature.SetFieldString(0, layername) + else: + logging.debug('Updating existing feature in layer cache for %s', attributeFilter) + update = True + assert cache.GetNextFeature() is None + + if not gdalVersionMin(maj=3, min=8): + tzFlag = 0 # ogr.TZFLAG_UNKNOWN + elif last_updated.tzinfo == UTC: + tzFlag = ogr.TZFLAG_UTC + else: + td = last_updated.utcoffset() + # 15min increments/decrements per unit above/below UTC, cf. + # https://gdal.org/en/stable/api/vector_c_api.html#c.OGR_TZFLAG_UTC + tzFlag = td.days * 96 + td.seconds // 900 + if timedelta(seconds=tzFlag*900) != td or abs(tzFlag) > 56: # max ±14:00 + raise RuntimeError(f'Invalid UTC offset {td}') + tzFlag += ogr.TZFLAG_UTC + + feature.SetField(1, last_updated.year, + last_updated.month, + last_updated.day, + last_updated.hour, + last_updated.minute, + float(last_updated.second) + float(last_updated.microsecond)/1000000., + tzFlag) + + if update: + # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead + if cache.SetFeature(feature) != ogr.OGRERR_NONE: + raise RuntimeError('Could not update feature in layer cache') + else: + if cache.CreateFeature(feature) != ogr.OGRERR_NONE: + raise RuntimeError('Could not create new feature in layer cache') |