aboutsummaryrefslogtreecommitdiffstats
path: root/import_source.py
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-04-23 17:55:57 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-04-24 16:49:37 +0200
commit80882acd6ba407847fed0ef308e440b88796e0e1 (patch)
tree7084e4508c9ad3aa72c034735e8cbc85738af93c /import_source.py
parentc42245fc566eab01dd2e1f4b07bcbef7432c89c1 (diff)
Change layer cache logic to target destination layers rather than sources.
In a future commit we'll fingerprint layers to detect changes. Comparing modification times is not enough since some sources (for instance Naturvårdsverket's SCI_Rikstackande) are updated on the server even though no objects are being added; the source layer remains unchanged but the file differs because of OBJECTID changes we are not interested in. Rather than using another cache layer/table for fingerprints, we cache destination layernames rather than triplets (source_path, archive_member, layername), along with the time at which the import was started rather than source_path's mtime. There is indeed no value in having exact source_path's mtime in the cache. What we need is simply a way to detect whether source paths have been updated in a subsequent run. Thanks to the shared locks the ctime of any updated source path will be at least the time when the locks are released, thereby exceeding the last_updated value.
Diffstat (limited to 'import_source.py')
-rw-r--r--import_source.py118
1 files changed, 52 insertions, 66 deletions
diff --git a/import_source.py b/import_source.py
index 1fa754c..8ce69a9 100644
--- a/import_source.py
+++ b/import_source.py
@@ -25,6 +25,7 @@ import tempfile
import re
from fnmatch import fnmatchcase
from pathlib import Path
+from datetime import datetime, timedelta, UTC
from typing import Any, Optional
from osgeo import gdal, ogr, osr
@@ -38,7 +39,7 @@ from osgeo.gdalconst import (
from osgeo import gdalconst
from common import BadConfiguration, escape_identifier, escape_literal_str
-from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, formatTZFlag
+from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, gdalVersionMin, formatTZFlag
def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset:
"""Open and return the output DS. It is created if create=False or
@@ -431,81 +432,18 @@ def listArchiveMembers(namelist : list[str],
logging.debug('Ignoring archive member %s', name)
return members
-def getSourceCacheKey(source : dict[str,Any]) -> tuple[str,str,str]:
- """Get the unique triplet (source_path, archive_member, layername)
- for the layer source which can be used for attribute filters and
- lookups."""
- importdef = source['import']
- sourcedef = source['source']
- source_path = sourcedef['path']
- archive_member = importdef['path'] if sourcedef.get('unar', None) is not None else ''
- layername = importdef.get('layername', '')
- return (source_path, archive_member, layername)
-
# pylint: disable-next=dangerous-default-value
def importSources(lyr_dst : ogr.Layer,
- dso : gdal.Dataset,
sources : dict[str,Any] = {},
cachedir : Path|None = None,
- mtimes : dict[str, int|None]|None = None,
- lyr_sourcecache : ogr.Layer|None = None,
- extent : ogr.Geometry|None = None) -> bool:
- """Import source layers."""
- bChanged = False
+ extent : ogr.Geometry|None = None) -> None:
+ """Import source layers to lyr_dst."""
for source in sources:
_importSource(lyr_dst, **source['source'],
args=source['import'],
cachedir=cachedir,
extent=extent)
- # force the PG driver to call EndCopy() to detect errors and trigger a rollback if needed
- # TODO [3.9] use lyr_dst.GetDataset().FlushCache() instead
- dso.FlushCache()
-
- if lyr_sourcecache is None:
- bChanged = True
- continue
-
- source_path, archive_member, layername = getSourceCacheKey(source)
- attributeFilter = ('(source_path,archive_member,layername) = ('
- + escape_literal_str(source_path) + ','
- + escape_literal_str(archive_member) + ','
- + escape_literal_str(layername) + ')')
- lyr_sourcecache.SetAttributeFilter(attributeFilter)
-
- feature = lyr_sourcecache.GetNextFeature()
- if feature is not None:
- logging.debug('Updating existing feature in source cache for %s', attributeFilter)
- update = True
- assert lyr_sourcecache.GetNextFeature() is None
- else:
- logging.debug('Creating new feature in source cache for %s', attributeFilter)
- update = False
- feature = ogr.Feature(lyr_sourcecache.GetLayerDefn())
- feature.SetFieldString(0, source_path)
- feature.SetFieldString(1, archive_member)
- feature.SetFieldString(2, layername)
-
- mtime_ns = mtimes[source_path]
- if mtime_ns is None:
- feature.SetFieldNull(3)
- else:
- feature.SetFieldInteger64(3, mtime_ns)
-
- if update:
- # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead
- if lyr_sourcecache.SetFeature(feature) != ogr.OGRERR_NONE:
- raise RuntimeError('Could not update feature in sourcecache')
- else:
- if lyr_sourcecache.CreateFeature(feature) != ogr.OGRERR_NONE:
- raise RuntimeError('Could not create new feature in sourcecache')
-
- # TODO [3.9] use lyr_sourcecache.GetDataset().FlushCache() instead
- dso.FlushCache()
- bChanged = True # TODO fingerprint the features to detect changes
-
- return bChanged
-
# pylint: disable-next=dangerous-default-value
def _importSource(lyr : ogr.Layer,
path : str = '/nonexistent',
@@ -830,3 +768,51 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
for t,n in sorted(mismatch.items(), key=lambda x: x[1]) ]
logging.info('Forced conversion to %s: %s',
ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches))
+
+def updateLayerCache(layername : str, cache : ogr.Layer,
+ last_updated : datetime) -> None:
+ """Update attributes in the layer cache for the given layer name."""
+ attributeFilter = 'layername = ' + escape_literal_str(layername)
+ logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
+ cache.SetAttributeFilter(attributeFilter)
+
+ feature = cache.GetNextFeature()
+ if feature is None:
+ # not in cache
+ logging.debug('Creating new feature in layer cache for %s', attributeFilter)
+ update = False
+ feature = ogr.Feature(cache.GetLayerDefn())
+ feature.SetFieldString(0, layername)
+ else:
+ logging.debug('Updating existing feature in layer cache for %s', attributeFilter)
+ update = True
+ assert cache.GetNextFeature() is None
+
+ if not gdalVersionMin(maj=3, min=8):
+ tzFlag = 0 # ogr.TZFLAG_UNKNOWN
+ elif last_updated.tzinfo == UTC:
+ tzFlag = ogr.TZFLAG_UTC
+ else:
+ td = last_updated.utcoffset()
+ # 15min increments/decrements per unit above/below UTC, cf.
+ # https://gdal.org/en/stable/api/vector_c_api.html#c.OGR_TZFLAG_UTC
+ tzFlag = td.days * 96 + td.seconds // 900
+ if timedelta(seconds=tzFlag*900) != td or abs(tzFlag) > 56: # max ±14:00
+ raise RuntimeError(f'Invalid UTC offset {td}')
+ tzFlag += ogr.TZFLAG_UTC
+
+ feature.SetField(1, last_updated.year,
+ last_updated.month,
+ last_updated.day,
+ last_updated.hour,
+ last_updated.minute,
+ float(last_updated.second) + float(last_updated.microsecond)/1000000.,
+ tzFlag)
+
+ if update:
+ # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead
+ if cache.SetFeature(feature) != ogr.OGRERR_NONE:
+ raise RuntimeError('Could not update feature in layer cache')
+ else:
+ if cache.CreateFeature(feature) != ogr.OGRERR_NONE:
+ raise RuntimeError('Could not create new feature in layer cache')