diff options
author | Guilhem Moulin <guilhem@fripost.org> | 2025-04-23 17:55:57 +0200 |
---|---|---|
committer | Guilhem Moulin <guilhem@fripost.org> | 2025-04-24 16:49:37 +0200 |
commit | 80882acd6ba407847fed0ef308e440b88796e0e1 (patch) | |
tree | 7084e4508c9ad3aa72c034735e8cbc85738af93c | |
parent | c42245fc566eab01dd2e1f4b07bcbef7432c89c1 (diff) |
Change layer cache logic to target destination layers rather than sources.
In a future commit we'll fingerprint layers to detect changes.
Comparing modification times is not enough since some sources (for
instance Naturvårdsverket's SCI_Rikstackande) are updated on the server
even though no objects are being added; the source layer remains
unchanged but the file differs because of OBJECTID changes we are not
interested in.
Rather than using another cache layer/table for fingerprints, we cache
destination layernames rather than triplets (source_path, archive_member,
layername), along with the time at which the import was started rather
than source_path's mtime.
There is indeed no value in having exact source_path's mtime in the
cache. What we need is simply a way to detect whether source paths have
been updated in a subsequent run. Thanks to the shared locks the ctime
of any updated source path will be at least the time when the locks are
released, thereby exceeding the last_updated value.
-rw-r--r-- | config.yml | 2 | ||||
-rw-r--r-- | import_source.py | 118 | ||||
-rw-r--r-- | schema.sql | 48 | ||||
-rwxr-xr-x | webmap-import | 280 |
4 files changed, 224 insertions, 224 deletions
@@ -108,7 +108,7 @@ dataset: open-options-publish: USER: webmap_guest - sourcecache-layername: public.sourcecache + layercache: public.layercache # Optional dictionary of default layer creation options, cf. # https://gdal.org/drivers/vector/pg.html#layer-creation-options or diff --git a/import_source.py b/import_source.py index 1fa754c..8ce69a9 100644 --- a/import_source.py +++ b/import_source.py @@ -25,6 +25,7 @@ import tempfile import re from fnmatch import fnmatchcase from pathlib import Path +from datetime import datetime, timedelta, UTC from typing import Any, Optional from osgeo import gdal, ogr, osr @@ -38,7 +39,7 @@ from osgeo.gdalconst import ( from osgeo import gdalconst from common import BadConfiguration, escape_identifier, escape_literal_str -from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, formatTZFlag +from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, gdalVersionMin, formatTZFlag def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset: """Open and return the output DS. It is created if create=False or @@ -431,81 +432,18 @@ def listArchiveMembers(namelist : list[str], logging.debug('Ignoring archive member %s', name) return members -def getSourceCacheKey(source : dict[str,Any]) -> tuple[str,str,str]: - """Get the unique triplet (source_path, archive_member, layername) - for the layer source which can be used for attribute filters and - lookups.""" - importdef = source['import'] - sourcedef = source['source'] - source_path = sourcedef['path'] - archive_member = importdef['path'] if sourcedef.get('unar', None) is not None else '' - layername = importdef.get('layername', '') - return (source_path, archive_member, layername) - # pylint: disable-next=dangerous-default-value def importSources(lyr_dst : ogr.Layer, - dso : gdal.Dataset, sources : dict[str,Any] = {}, cachedir : Path|None = None, - mtimes : dict[str, int|None]|None = None, - lyr_sourcecache : ogr.Layer|None = None, - extent : ogr.Geometry|None = None) -> bool: - """Import source layers.""" - bChanged = False + extent : ogr.Geometry|None = None) -> None: + """Import source layers to lyr_dst.""" for source in sources: _importSource(lyr_dst, **source['source'], args=source['import'], cachedir=cachedir, extent=extent) - # force the PG driver to call EndCopy() to detect errors and trigger a rollback if needed - # TODO [3.9] use lyr_dst.GetDataset().FlushCache() instead - dso.FlushCache() - - if lyr_sourcecache is None: - bChanged = True - continue - - source_path, archive_member, layername = getSourceCacheKey(source) - attributeFilter = ('(source_path,archive_member,layername) = (' - + escape_literal_str(source_path) + ',' - + escape_literal_str(archive_member) + ',' - + escape_literal_str(layername) + ')') - lyr_sourcecache.SetAttributeFilter(attributeFilter) - - feature = lyr_sourcecache.GetNextFeature() - if feature is not None: - logging.debug('Updating existing feature in source cache for %s', attributeFilter) - update = True - assert lyr_sourcecache.GetNextFeature() is None - else: - logging.debug('Creating new feature in source cache for %s', attributeFilter) - update = False - feature = ogr.Feature(lyr_sourcecache.GetLayerDefn()) - feature.SetFieldString(0, source_path) - feature.SetFieldString(1, archive_member) - feature.SetFieldString(2, layername) - - mtime_ns = mtimes[source_path] - if mtime_ns is None: - feature.SetFieldNull(3) - else: - feature.SetFieldInteger64(3, mtime_ns) - - if update: - # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead - if lyr_sourcecache.SetFeature(feature) != ogr.OGRERR_NONE: - raise RuntimeError('Could not update feature in sourcecache') - else: - if lyr_sourcecache.CreateFeature(feature) != ogr.OGRERR_NONE: - raise RuntimeError('Could not create new feature in sourcecache') - - # TODO [3.9] use lyr_sourcecache.GetDataset().FlushCache() instead - dso.FlushCache() - bChanged = True # TODO fingerprint the features to detect changes - - return bChanged - # pylint: disable-next=dangerous-default-value def _importSource(lyr : ogr.Layer, path : str = '/nonexistent', @@ -830,3 +768,51 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], for t,n in sorted(mismatch.items(), key=lambda x: x[1]) ] logging.info('Forced conversion to %s: %s', ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches)) + +def updateLayerCache(layername : str, cache : ogr.Layer, + last_updated : datetime) -> None: + """Update attributes in the layer cache for the given layer name.""" + attributeFilter = 'layername = ' + escape_literal_str(layername) + logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter) + cache.SetAttributeFilter(attributeFilter) + + feature = cache.GetNextFeature() + if feature is None: + # not in cache + logging.debug('Creating new feature in layer cache for %s', attributeFilter) + update = False + feature = ogr.Feature(cache.GetLayerDefn()) + feature.SetFieldString(0, layername) + else: + logging.debug('Updating existing feature in layer cache for %s', attributeFilter) + update = True + assert cache.GetNextFeature() is None + + if not gdalVersionMin(maj=3, min=8): + tzFlag = 0 # ogr.TZFLAG_UNKNOWN + elif last_updated.tzinfo == UTC: + tzFlag = ogr.TZFLAG_UTC + else: + td = last_updated.utcoffset() + # 15min increments/decrements per unit above/below UTC, cf. + # https://gdal.org/en/stable/api/vector_c_api.html#c.OGR_TZFLAG_UTC + tzFlag = td.days * 96 + td.seconds // 900 + if timedelta(seconds=tzFlag*900) != td or abs(tzFlag) > 56: # max ±14:00 + raise RuntimeError(f'Invalid UTC offset {td}') + tzFlag += ogr.TZFLAG_UTC + + feature.SetField(1, last_updated.year, + last_updated.month, + last_updated.day, + last_updated.hour, + last_updated.minute, + float(last_updated.second) + float(last_updated.microsecond)/1000000., + tzFlag) + + if update: + # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead + if cache.SetFeature(feature) != ogr.OGRERR_NONE: + raise RuntimeError('Could not update feature in layer cache') + else: + if cache.CreateFeature(feature) != ogr.OGRERR_NONE: + raise RuntimeError('Could not create new feature in layer cache') @@ -2751,32 +2751,30 @@ ALTER SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" OWNED BY postgis."vbk:vin -- --- Name: sourcecache; Type: TABLE; Schema: public; Owner: webmap_import +-- Name: layercache; Type: TABLE; Schema: public; Owner: webmap_import -- -CREATE TABLE public.sourcecache ( +CREATE TABLE public.layercache ( ogc_fid bigint NOT NULL, - source_path character varying(2047) NOT NULL, - archive_member character varying(2047) NOT NULL, layername character varying(255) NOT NULL, - mtime_ns bigint + last_updated timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL ); -ALTER TABLE public.sourcecache OWNER TO webmap_import; +ALTER TABLE public.layercache OWNER TO webmap_import; -- --- Name: TABLE sourcecache; Type: COMMENT; Schema: public; Owner: webmap_import +-- Name: TABLE layercache; Type: COMMENT; Schema: public; Owner: webmap_import -- -COMMENT ON TABLE public.sourcecache IS 'Metadata cache for sources files'; +COMMENT ON TABLE public.layercache IS 'Layer metadata cache'; -- --- Name: sourcecache_ogc_fid_seq; Type: SEQUENCE; Schema: public; Owner: webmap_import +-- Name: layercache_ogc_fid_seq; Type: SEQUENCE; Schema: public; Owner: webmap_import -- -CREATE SEQUENCE public.sourcecache_ogc_fid_seq +CREATE SEQUENCE public.layercache_ogc_fid_seq START WITH 1 INCREMENT BY 1 NO MINVALUE @@ -2784,13 +2782,13 @@ CREATE SEQUENCE public.sourcecache_ogc_fid_seq CACHE 1; -ALTER TABLE public.sourcecache_ogc_fid_seq OWNER TO webmap_import; +ALTER TABLE public.layercache_ogc_fid_seq OWNER TO webmap_import; -- --- Name: sourcecache_ogc_fid_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: webmap_import +-- Name: layercache_ogc_fid_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: webmap_import -- -ALTER SEQUENCE public.sourcecache_ogc_fid_seq OWNED BY public.sourcecache.ogc_fid; +ALTER SEQUENCE public.layercache_ogc_fid_seq OWNED BY public.layercache.ogc_fid; -- @@ -3165,10 +3163,10 @@ ALTER TABLE ONLY postgis."vbk:vindkraftverk" ALTER COLUMN ogc_fid SET DEFAULT ne -- --- Name: sourcecache ogc_fid; Type: DEFAULT; Schema: public; Owner: webmap_import +-- Name: layercache ogc_fid; Type: DEFAULT; Schema: public; Owner: webmap_import -- -ALTER TABLE ONLY public.sourcecache ALTER COLUMN ogc_fid SET DEFAULT nextval('public.sourcecache_ogc_fid_seq'::regclass); +ALTER TABLE ONLY public.layercache ALTER COLUMN ogc_fid SET DEFAULT nextval('public.layercache_ogc_fid_seq'::regclass); -- @@ -4044,19 +4042,19 @@ ALTER TABLE ONLY postgis."vbk:vindkraftverk" -- --- Name: sourcecache sourcecache_pkey; Type: CONSTRAINT; Schema: public; Owner: webmap_import +-- Name: layercache layercache_layername_key; Type: CONSTRAINT; Schema: public; Owner: webmap_import -- -ALTER TABLE ONLY public.sourcecache - ADD CONSTRAINT sourcecache_pkey PRIMARY KEY (ogc_fid); +ALTER TABLE ONLY public.layercache + ADD CONSTRAINT layercache_layername_key UNIQUE (layername); -- --- Name: sourcecache sourcecache_source_path_archive_member_layername_key; Type: CONSTRAINT; Schema: public; Owner: webmap_import +-- Name: layercache layercache_pkey; Type: CONSTRAINT; Schema: public; Owner: webmap_import -- -ALTER TABLE ONLY public.sourcecache - ADD CONSTRAINT sourcecache_source_path_archive_member_layername_key UNIQUE (source_path, archive_member, layername); +ALTER TABLE ONLY public.layercache + ADD CONSTRAINT layercache_pkey PRIMARY KEY (ogc_fid); -- @@ -5210,17 +5208,17 @@ GRANT SELECT,USAGE ON SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" TO webmap -- --- Name: TABLE sourcecache; Type: ACL; Schema: public; Owner: webmap_import +-- Name: TABLE layercache; Type: ACL; Schema: public; Owner: webmap_import -- -GRANT SELECT ON TABLE public.sourcecache TO webmap_guest; +GRANT SELECT ON TABLE public.layercache TO webmap_guest; -- --- Name: SEQUENCE sourcecache_ogc_fid_seq; Type: ACL; Schema: public; Owner: webmap_import +-- Name: SEQUENCE layercache_ogc_fid_seq; Type: ACL; Schema: public; Owner: webmap_import -- -GRANT SELECT,USAGE ON SEQUENCE public.sourcecache_ogc_fid_seq TO webmap_guest; +GRANT SELECT,USAGE ON SEQUENCE public.layercache_ogc_fid_seq TO webmap_guest; -- diff --git a/webmap-import b/webmap-import index fd10705..0bf9c9e 100755 --- a/webmap-import +++ b/webmap-import @@ -28,7 +28,8 @@ from fcntl import flock, LOCK_EX, LOCK_SH import logging import argparse import re -from datetime import datetime +from datetime import datetime, timedelta, timezone, UTC +from math import modf from pathlib import Path from typing import Any, Optional, NoReturn import traceback @@ -65,7 +66,7 @@ from import_source import ( validateOutputLayer, clearLayer, importSources, - getSourceCacheKey + updateLayerCache ) def setFieldIf(cond : bool, @@ -284,13 +285,14 @@ def validate_sources(layers : dict[str, Any]) -> None: for layername in toremove: layers.pop(layername) -def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int, - name : str, - typ : int, - subtyp : int = ogr.OFSTNone, - width : int = 0, - isNullable : Optional[bool] = None) -> bool: - """Validate field #idx from the source cache layer/table.""" +def validateLayerCacheField(defn : ogr.FeatureDefn, idx : int, + name : str, + typ : int, + subtyp : int = ogr.OFSTNone, + width : int = 0, + unique : Optional[bool] = None, + nullable : Optional[bool] = None) -> bool: + """Validate field #idx from the layer cache table.""" n = defn.GetFieldCount() if idx >= n: return False @@ -299,173 +301,175 @@ def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int, b = True name2 = defn.GetName() if name2 != name: - logging.warning('Source cache layer\'s field #%d has name "%s" != "%s", ignoring cache', - idx, name2, name) + logging.warning('Layer cache\'s field #%d has name "%s" != "%s"', idx, name2, name) b = False - if isNullable is not None and defn.IsNullable() != isNullable: + if nullable is not None and defn.IsNullable() != nullable: # non-fatal - logging.warning('Source cache layer\'s field #%d ("%s") %s nullable', - idx, name2, 'is' if defn.IsNullable() else 'is not') + logging.warning('Layer cache\'s field #%d ("%s") %s nullable', + idx, name2, 'is' if defn.IsNullable() else 'isn\'t') + + if unique is not None and defn.IsUnique() != unique: + # non-fatal + logging.warning('Layer cache\'s field #%d ("%s") %s unique', + idx, name2, 'is' if defn.IsUnique() else 'isn\'t') typ2 = defn.GetType() if typ2 != typ: - logging.warning('Source cache layer\'s field #%d ("%s") has type %s != %s, ignoring cache', - idx, name2, + logging.warning('Layer cache\'s field #%d ("%s") has type %s != %s', idx, name2, ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ)) b = False subtyp2 = defn.GetSubType() if subtyp2 != subtyp: - logging.warning('Source cache layer\'s field #%d ("%s") has subtype %s != %s, ' - 'ignoring cache', idx, name2, + logging.warning('Layer cache\'s field #%d ("%s") has subtype %s != %s', idx, name2, ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp)) b = False width2 = defn.GetWidth() if width2 != 0 and (width == 0 or width2 < width): # non-fatal - logging.warning('Source cache layer\'s field #%d ("%s") is too small (width %d < %d)', + logging.warning('Layer cache\'s field #%d ("%s") is too small (width %d < %d)', idx, name2, width2, width) return b -def getSourceCacheLayer(ds : gdal.Dataset, name : str|None, - force : bool = False) -> ogr.Layer|None: - """Get the source cache layer/table and validate it.""" - if name is None: - return None - +def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool: + """Validate layer cache table.""" lyr = ds.GetLayerByName(name) if lyr is None: - if not force: # show a warning if args.force is not set - logging.warning('Table "%s" does not exist, implying --force', name) - return None + logging.warning('Table "%s" does not exist', name) + return False # if not (lyr.TestCapability(ogr.OLCRandomWrite) and -# gdalVersionMin(maj=3, min=9) and +# gdalVersionMin(maj=3, min=7) and # lyr.TestCapability(ogr.OLCUpdateFeature)): -# # TODO OLCUpdateFeature was added in 3.7 but we also want to use .GetDataset() -# # which was added in 3.9 # logging.warning('Layer "%s" does not support OLCUpdateFeature capability, ' # 'ignoring cache', name) # return None defn = lyr.GetLayerDefn() fields = [ - { 'name': 'source_path', 'typ': ogr.OFTString, - 'isNullable': False, 'width': 2047 }, - { 'name': 'archive_member', 'typ': ogr.OFTString, - 'isNullable': False, 'width': 2047 }, { 'name': 'layername', 'typ': ogr.OFTString, - 'isNullable': False, 'width': 255 }, - { 'name': 'mtime_ns', 'typ': ogr.OFTInteger64, - 'isNullable': True }, + 'nullable': False, 'unique': True, 'width': 255 }, + { 'name': 'last_updated', 'typ': ogr.OFTDateTime, + 'nullable': False }, ] m = len(fields) n = defn.GetFieldCount() if n < m: - logging.warning('Source cache layer/table "%s" has %d < %d fields, ignoring cache', - name, n, m) + # this is fatal, and `all(bs)` is False so we return False below + logging.warning('Layer cache "%s" has %d < %d fields', name, n, m) elif n != m: - logging.warning('Source cache layer/table "%s" has %d != %d fields', name, n, m) - if not all(validateSourceCacheField(defn, i, **fld) for i,fld in enumerate(fields)): - return None + logging.warning('Layer cache "%s" has %d != %d fields', name, n, m) + bs = [ validateLayerCacheField(defn, i, **fld) for i,fld in enumerate(fields) ] + if not all(bs): + return False n = defn.GetGeomFieldCount() if n > 0: geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName()) for i in range(n) ] - logging.warning('Source cache layer/table "%s" has %d > 0 geometry field(s): %s', + logging.warning('Layer cache "%s" has %d > 0 geometry field(s): %s', name, n, ', '.join(geomFieldNames)) if gdalVersionMin(maj=3, min=5): style = lyr.GetStyleTable() if style is not None: - logging.warning('Source cache layer/table "%s" has a style table "%s"', + logging.warning('Layer cache "%s" has a style table "%s"', name, style.GetLastStyleName()) - return lyr - -def getSourcesMtimeNS(sources : dict[str,Any], - cachedir : Optional[Path] = None) -> dict[str,int|None]: - """Return a dictionary mapping each source path to its last modification - time (in nanoseconds), or None if said source path is not a regular file.""" - mtimes_ns = {} + return True + +def areSourceFilesNewer(layername : str, + sources : dict[str,Any], + cache : ogr.Layer, + cachedir : Optional[Path] = None) -> bool: + """Return a boolean indicating whether the layer cache is up to date with + respect to the source files found on disk. That is, the last modification + and last changed time of each source file needs to be equal or lower than + the `last_updated` value found in the layer cache.""" + + source_paths = set() for source in sources: # the same source_path can be used multiple times, stat(2) only once source_path = source['source']['path'] - mtimes_ns[source_path] = None - for source_path in mtimes_ns: + source_paths.add(source_path) + if len(source_paths) == 0: + return False + + t = None + mtimes_ns = {} + for source_path in source_paths: path = source_path if cachedir is None else str(cachedir.joinpath(source_path)) try: st = os.stat(path) if not S_ISREG(st.st_mode): raise FileNotFoundError mtimes_ns[source_path] = st.st_mtime_ns + # take max(mtime, ctime): if we lock source paths any update after + # aquiring the lock will yield a value larger than time.time_ns() + t2 = max(st.st_mtime_ns, st.st_ctime_ns) + if t is None or t < t2: + t = t2 except (OSError, ValueError): #logging.warning('Could not stat(%s)', path) - pass - return mtimes_ns - -def isSourceCacheDirtyTime(lyr : ogr.Layer, sources : dict[str,Any], - mtimes_ns : dict[str,int|None]) -> bool: - """Return a boolean indicating whether the source cache layer/table is up to - date with respect to the dictionary mapping each source path to its last - modification time. That is, every triplet (source_path, archive_member, - layername) needs to be present in cache, and the corresponding mtime_ns - needs to match the stat(2) output of the source file on disk.""" - ks = set() - for source in sources: - source_path, archive_member, layername = k = getSourceCacheKey(source) - if k in ks: - raise BadConfiguration(f'Duplicate key {k}') - ks.add(k) + return True + assert t is not None - if len(ks) == 0: - return False + attributeFilter = 'layername = ' + escape_literal_str(layername) + logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter) + cache.SetAttributeFilter(attributeFilter) - attributeFilter = [] - for source_path, archive_member, layername in ks: - attributeFilter.append('(' + escape_literal_str(source_path) + ',' - + escape_literal_str(archive_member) + ',' - + escape_literal_str(layername) + ')') - if len(attributeFilter) == 1: - attributeFilter = '= ' + attributeFilter[0] + feature = cache.GetNextFeature() + if feature is None: + # not in cache + return True + + if not feature.IsFieldSetAndNotNull(1): + ret = True else: - attributeFilter = 'IN (' + ','.join(attributeFilter) + ')' - attributeFilter = '(source_path,archive_member,layername) ' + attributeFilter - logging.debug('SetAttributeFilter("%s", "%s")', lyr.GetName(), attributeFilter) - lyr.SetAttributeFilter(attributeFilter) - - cache = {} - feature = lyr.GetNextFeature() - while feature is not None: - k = ( - feature.GetFieldAsString(0) if feature.IsFieldSetAndNotNull(0) else None, - feature.GetFieldAsString(1) if feature.IsFieldSetAndNotNull(1) else None, - feature.GetFieldAsString(2) if feature.IsFieldSetAndNotNull(2) else None, + # https://gdal.org/en/stable/api/python/vector_api.html#osgeo.ogr.Feature.GetFieldAsDateTime + # [ year, month, day, hour, minute, second, timezone flag ] + dt = feature.GetFieldAsDateTime(1) + if not gdalVersionMin(maj=3, min=8): + tz = None # assume local time + elif dt[6] == ogr.TZFLAG_UNKNOWN: + logging.warning('Datetime specified with unknown timezone in layer cache\'s ' + 'field #%d "%s", assuming local time', 1, + feature.GetDefnRef().GetFieldDefn(1).GetName()) + tz = None + elif dt[6] == ogr.TZFLAG_LOCALTIME: + tz = None + elif dt[6] == ogr.TZFLAG_UTC: + tz = UTC + else: + tz = timezone(offset=timedelta(seconds=(dt[6] - ogr.TZFLAG_UTC) * 900)) + ms, s = modf(dt[5]) + dt = datetime( + year=dt[0], # including century + month=dt[1], # 01 ≤ year ≤ 12 + day=dt[2], # 01 ≤ day ≤ 31 + hour=dt[3], # 00 ≤ hour ≤ 23 + minute=dt[4], # 00 ≤ minute ≤ 59 + second=int(s), # 00 ≤ second ≤ 59 + microsecond=round(ms*1000000), + tzinfo=tz ) - if any(k0 is None for k0 in k): - # no column in the the key may be NULL - raise RuntimeError(f'Bad key {k}') - if k in cache: - raise RuntimeError(f'Duplicate key {k}') - cache[k] = feature.GetFieldAsInteger64(3) if feature.IsFieldSetAndNotNull(3) else None - feature = lyr.GetNextFeature() - - for k in ks: - a = cache.get(k, None) - b = mtimes_ns[k[0]] - if a is None or b is None or a != b: - return True - - for source_path in sorted({ p for p,_,_ in ks }): - # XXX datetime.fromtimestamp() doesn't support nanosecond input - # https://github.com/python/cpython/issues/59648 - mtime = (mtimes_ns[source_path] // 1000)/1000000. - dt = datetime.fromtimestamp(mtime) - logging.info('Source file %s is unchanged (last modification time %s)', - source_path, dt.astimezone().isoformat(timespec='seconds')) - return False + logging.debug('Found entry in layer cache for "%s", last_updated=%s', layername, + dt.isoformat(timespec='microseconds')) + ret = int(dt.timestamp() * 1000000.) * 1000 < t + + if cache.GetNextFeature() is not None: + raise RuntimeError(f'Duplicate key {layername}') + + if not ret: + for source_path, mtime_ns in sorted(mtimes_ns.items()): + # XXX datetime.fromtimestamp() doesn't support nanosecond input + # https://github.com/python/cpython/issues/59648 + mtime = (mtime_ns // 1000) / 1000000. + dt = datetime.fromtimestamp(mtime) + logging.info('Source file %s is unchanged (last modification time %s)', + source_path, dt.astimezone().isoformat(timespec='seconds')) + return ret def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]: """Place shared locks on each source path and return their respective file @@ -611,6 +615,7 @@ def main() -> NoReturn: lockdir=args.lockdir_sources) if (dso.TestCapability(ogr.ODsCTransactions) and + # we need SAVEPOINT support dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')): logging.debug('Starting transaction') dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE @@ -619,10 +624,13 @@ def main() -> NoReturn: dso.GetDriver().ShortName) dsoTransaction = False - # get source cache layer/table - lyr_sourcecache = getSourceCacheLayer(dso, - config['dataset'].get('sourcecache-layername', None), - force=args.force) + # validate layer cache + lyr_cache = config['dataset'].get('layercache', None) + if lyr_cache is not None and not validateCacheLayer(dso, lyr_cache): + if not args.force: + logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache) + args.force = True + lyr_cache = None bChanged = False rv = 0 @@ -637,14 +645,12 @@ def main() -> NoReturn: 'CreateFeature() method') sources = layerdef['sources'] - if lyr_sourcecache is None: - sourcesMtimeNS = None - else: - sourcesMtimeNS = getSourcesMtimeNS(sources, cachedir=cachedir) - if not (args.force or isSourceCacheDirtyTime(lyr_sourcecache, sources, - mtimes_ns=sourcesMtimeNS)): - logging.info('Output layer "%s" is up to date, skipping', layername) - continue + if not (lyr_cache is None or args.force or + areSourceFilesNewer(layername, sources=sources, + cache=dso.GetLayerByName(lyr_cache), + cachedir=cachedir)): + logging.info('Output layer "%s" is up to date, skipping', layername) + continue validateOutputLayer(lyr, srs=srs, options=layerdef['create']) @@ -665,17 +671,28 @@ def main() -> NoReturn: lyrTransaction = False try: - clearLayer(dso, lyr) # TODO conditional (only if not new)? - description = layerdef.get('description', None) if (description is not None and lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): logging.warning('Could not set description metadata') - bChanged0 = importSources(lyr_dst=lyr, dso=dso, sources=sources, - mtimes=sourcesMtimeNS, - lyr_sourcecache=lyr_sourcecache, - cachedir=cachedir, extent=extent) + now = datetime.now().astimezone() + clearLayer(dso, lyr) # TODO conditional (only if not new)? + importSources(lyr_dst=lyr, sources=sources, + cachedir=cachedir, extent=extent) + + # force the PG driver to call EndCopy() to detect errors and trigger a + # rollback if needed + dso.FlushCache() + + if lyr_cache is None: + bChanged0 = True + else: + updateLayerCache(layername=layername, + cache=dso.GetLayerByName(lyr_cache), + last_updated=now) + bChanged0 = True # TODO fingerprint lyr to detect changes + dso.FlushCache() bChanged = bChanged or bChanged0 if isinstance(lyrTransaction, str): @@ -732,7 +749,6 @@ def main() -> NoReturn: sys.exit(1) finally: - lyr_sourcecache = None dso = None srs = None extent = None |