aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-04-23 17:55:57 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-04-24 16:49:37 +0200
commit80882acd6ba407847fed0ef308e440b88796e0e1 (patch)
tree7084e4508c9ad3aa72c034735e8cbc85738af93c
parentc42245fc566eab01dd2e1f4b07bcbef7432c89c1 (diff)
Change layer cache logic to target destination layers rather than sources.
In a future commit we'll fingerprint layers to detect changes. Comparing modification times is not enough since some sources (for instance Naturvårdsverket's SCI_Rikstackande) are updated on the server even though no objects are being added; the source layer remains unchanged but the file differs because of OBJECTID changes we are not interested in. Rather than using another cache layer/table for fingerprints, we cache destination layernames rather than triplets (source_path, archive_member, layername), along with the time at which the import was started rather than source_path's mtime. There is indeed no value in having exact source_path's mtime in the cache. What we need is simply a way to detect whether source paths have been updated in a subsequent run. Thanks to the shared locks the ctime of any updated source path will be at least the time when the locks are released, thereby exceeding the last_updated value.
-rw-r--r--config.yml2
-rw-r--r--import_source.py118
-rw-r--r--schema.sql48
-rwxr-xr-xwebmap-import280
4 files changed, 224 insertions, 224 deletions
diff --git a/config.yml b/config.yml
index 8fa6ca7..08ee039 100644
--- a/config.yml
+++ b/config.yml
@@ -108,7 +108,7 @@ dataset:
open-options-publish:
USER: webmap_guest
- sourcecache-layername: public.sourcecache
+ layercache: public.layercache
# Optional dictionary of default layer creation options, cf.
# https://gdal.org/drivers/vector/pg.html#layer-creation-options or
diff --git a/import_source.py b/import_source.py
index 1fa754c..8ce69a9 100644
--- a/import_source.py
+++ b/import_source.py
@@ -25,6 +25,7 @@ import tempfile
import re
from fnmatch import fnmatchcase
from pathlib import Path
+from datetime import datetime, timedelta, UTC
from typing import Any, Optional
from osgeo import gdal, ogr, osr
@@ -38,7 +39,7 @@ from osgeo.gdalconst import (
from osgeo import gdalconst
from common import BadConfiguration, escape_identifier, escape_literal_str
-from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, formatTZFlag
+from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, gdalVersionMin, formatTZFlag
def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset:
"""Open and return the output DS. It is created if create=False or
@@ -431,81 +432,18 @@ def listArchiveMembers(namelist : list[str],
logging.debug('Ignoring archive member %s', name)
return members
-def getSourceCacheKey(source : dict[str,Any]) -> tuple[str,str,str]:
- """Get the unique triplet (source_path, archive_member, layername)
- for the layer source which can be used for attribute filters and
- lookups."""
- importdef = source['import']
- sourcedef = source['source']
- source_path = sourcedef['path']
- archive_member = importdef['path'] if sourcedef.get('unar', None) is not None else ''
- layername = importdef.get('layername', '')
- return (source_path, archive_member, layername)
-
# pylint: disable-next=dangerous-default-value
def importSources(lyr_dst : ogr.Layer,
- dso : gdal.Dataset,
sources : dict[str,Any] = {},
cachedir : Path|None = None,
- mtimes : dict[str, int|None]|None = None,
- lyr_sourcecache : ogr.Layer|None = None,
- extent : ogr.Geometry|None = None) -> bool:
- """Import source layers."""
- bChanged = False
+ extent : ogr.Geometry|None = None) -> None:
+ """Import source layers to lyr_dst."""
for source in sources:
_importSource(lyr_dst, **source['source'],
args=source['import'],
cachedir=cachedir,
extent=extent)
- # force the PG driver to call EndCopy() to detect errors and trigger a rollback if needed
- # TODO [3.9] use lyr_dst.GetDataset().FlushCache() instead
- dso.FlushCache()
-
- if lyr_sourcecache is None:
- bChanged = True
- continue
-
- source_path, archive_member, layername = getSourceCacheKey(source)
- attributeFilter = ('(source_path,archive_member,layername) = ('
- + escape_literal_str(source_path) + ','
- + escape_literal_str(archive_member) + ','
- + escape_literal_str(layername) + ')')
- lyr_sourcecache.SetAttributeFilter(attributeFilter)
-
- feature = lyr_sourcecache.GetNextFeature()
- if feature is not None:
- logging.debug('Updating existing feature in source cache for %s', attributeFilter)
- update = True
- assert lyr_sourcecache.GetNextFeature() is None
- else:
- logging.debug('Creating new feature in source cache for %s', attributeFilter)
- update = False
- feature = ogr.Feature(lyr_sourcecache.GetLayerDefn())
- feature.SetFieldString(0, source_path)
- feature.SetFieldString(1, archive_member)
- feature.SetFieldString(2, layername)
-
- mtime_ns = mtimes[source_path]
- if mtime_ns is None:
- feature.SetFieldNull(3)
- else:
- feature.SetFieldInteger64(3, mtime_ns)
-
- if update:
- # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead
- if lyr_sourcecache.SetFeature(feature) != ogr.OGRERR_NONE:
- raise RuntimeError('Could not update feature in sourcecache')
- else:
- if lyr_sourcecache.CreateFeature(feature) != ogr.OGRERR_NONE:
- raise RuntimeError('Could not create new feature in sourcecache')
-
- # TODO [3.9] use lyr_sourcecache.GetDataset().FlushCache() instead
- dso.FlushCache()
- bChanged = True # TODO fingerprint the features to detect changes
-
- return bChanged
-
# pylint: disable-next=dangerous-default-value
def _importSource(lyr : ogr.Layer,
path : str = '/nonexistent',
@@ -830,3 +768,51 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
for t,n in sorted(mismatch.items(), key=lambda x: x[1]) ]
logging.info('Forced conversion to %s: %s',
ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches))
+
+def updateLayerCache(layername : str, cache : ogr.Layer,
+ last_updated : datetime) -> None:
+ """Update attributes in the layer cache for the given layer name."""
+ attributeFilter = 'layername = ' + escape_literal_str(layername)
+ logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
+ cache.SetAttributeFilter(attributeFilter)
+
+ feature = cache.GetNextFeature()
+ if feature is None:
+ # not in cache
+ logging.debug('Creating new feature in layer cache for %s', attributeFilter)
+ update = False
+ feature = ogr.Feature(cache.GetLayerDefn())
+ feature.SetFieldString(0, layername)
+ else:
+ logging.debug('Updating existing feature in layer cache for %s', attributeFilter)
+ update = True
+ assert cache.GetNextFeature() is None
+
+ if not gdalVersionMin(maj=3, min=8):
+ tzFlag = 0 # ogr.TZFLAG_UNKNOWN
+ elif last_updated.tzinfo == UTC:
+ tzFlag = ogr.TZFLAG_UTC
+ else:
+ td = last_updated.utcoffset()
+ # 15min increments/decrements per unit above/below UTC, cf.
+ # https://gdal.org/en/stable/api/vector_c_api.html#c.OGR_TZFLAG_UTC
+ tzFlag = td.days * 96 + td.seconds // 900
+ if timedelta(seconds=tzFlag*900) != td or abs(tzFlag) > 56: # max ±14:00
+ raise RuntimeError(f'Invalid UTC offset {td}')
+ tzFlag += ogr.TZFLAG_UTC
+
+ feature.SetField(1, last_updated.year,
+ last_updated.month,
+ last_updated.day,
+ last_updated.hour,
+ last_updated.minute,
+ float(last_updated.second) + float(last_updated.microsecond)/1000000.,
+ tzFlag)
+
+ if update:
+ # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead
+ if cache.SetFeature(feature) != ogr.OGRERR_NONE:
+ raise RuntimeError('Could not update feature in layer cache')
+ else:
+ if cache.CreateFeature(feature) != ogr.OGRERR_NONE:
+ raise RuntimeError('Could not create new feature in layer cache')
diff --git a/schema.sql b/schema.sql
index b4d7608..e5a1002 100644
--- a/schema.sql
+++ b/schema.sql
@@ -2751,32 +2751,30 @@ ALTER SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" OWNED BY postgis."vbk:vin
--
--- Name: sourcecache; Type: TABLE; Schema: public; Owner: webmap_import
+-- Name: layercache; Type: TABLE; Schema: public; Owner: webmap_import
--
-CREATE TABLE public.sourcecache (
+CREATE TABLE public.layercache (
ogc_fid bigint NOT NULL,
- source_path character varying(2047) NOT NULL,
- archive_member character varying(2047) NOT NULL,
layername character varying(255) NOT NULL,
- mtime_ns bigint
+ last_updated timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL
);
-ALTER TABLE public.sourcecache OWNER TO webmap_import;
+ALTER TABLE public.layercache OWNER TO webmap_import;
--
--- Name: TABLE sourcecache; Type: COMMENT; Schema: public; Owner: webmap_import
+-- Name: TABLE layercache; Type: COMMENT; Schema: public; Owner: webmap_import
--
-COMMENT ON TABLE public.sourcecache IS 'Metadata cache for sources files';
+COMMENT ON TABLE public.layercache IS 'Layer metadata cache';
--
--- Name: sourcecache_ogc_fid_seq; Type: SEQUENCE; Schema: public; Owner: webmap_import
+-- Name: layercache_ogc_fid_seq; Type: SEQUENCE; Schema: public; Owner: webmap_import
--
-CREATE SEQUENCE public.sourcecache_ogc_fid_seq
+CREATE SEQUENCE public.layercache_ogc_fid_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
@@ -2784,13 +2782,13 @@ CREATE SEQUENCE public.sourcecache_ogc_fid_seq
CACHE 1;
-ALTER TABLE public.sourcecache_ogc_fid_seq OWNER TO webmap_import;
+ALTER TABLE public.layercache_ogc_fid_seq OWNER TO webmap_import;
--
--- Name: sourcecache_ogc_fid_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: webmap_import
+-- Name: layercache_ogc_fid_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: webmap_import
--
-ALTER SEQUENCE public.sourcecache_ogc_fid_seq OWNED BY public.sourcecache.ogc_fid;
+ALTER SEQUENCE public.layercache_ogc_fid_seq OWNED BY public.layercache.ogc_fid;
--
@@ -3165,10 +3163,10 @@ ALTER TABLE ONLY postgis."vbk:vindkraftverk" ALTER COLUMN ogc_fid SET DEFAULT ne
--
--- Name: sourcecache ogc_fid; Type: DEFAULT; Schema: public; Owner: webmap_import
+-- Name: layercache ogc_fid; Type: DEFAULT; Schema: public; Owner: webmap_import
--
-ALTER TABLE ONLY public.sourcecache ALTER COLUMN ogc_fid SET DEFAULT nextval('public.sourcecache_ogc_fid_seq'::regclass);
+ALTER TABLE ONLY public.layercache ALTER COLUMN ogc_fid SET DEFAULT nextval('public.layercache_ogc_fid_seq'::regclass);
--
@@ -4044,19 +4042,19 @@ ALTER TABLE ONLY postgis."vbk:vindkraftverk"
--
--- Name: sourcecache sourcecache_pkey; Type: CONSTRAINT; Schema: public; Owner: webmap_import
+-- Name: layercache layercache_layername_key; Type: CONSTRAINT; Schema: public; Owner: webmap_import
--
-ALTER TABLE ONLY public.sourcecache
- ADD CONSTRAINT sourcecache_pkey PRIMARY KEY (ogc_fid);
+ALTER TABLE ONLY public.layercache
+ ADD CONSTRAINT layercache_layername_key UNIQUE (layername);
--
--- Name: sourcecache sourcecache_source_path_archive_member_layername_key; Type: CONSTRAINT; Schema: public; Owner: webmap_import
+-- Name: layercache layercache_pkey; Type: CONSTRAINT; Schema: public; Owner: webmap_import
--
-ALTER TABLE ONLY public.sourcecache
- ADD CONSTRAINT sourcecache_source_path_archive_member_layername_key UNIQUE (source_path, archive_member, layername);
+ALTER TABLE ONLY public.layercache
+ ADD CONSTRAINT layercache_pkey PRIMARY KEY (ogc_fid);
--
@@ -5210,17 +5208,17 @@ GRANT SELECT,USAGE ON SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" TO webmap
--
--- Name: TABLE sourcecache; Type: ACL; Schema: public; Owner: webmap_import
+-- Name: TABLE layercache; Type: ACL; Schema: public; Owner: webmap_import
--
-GRANT SELECT ON TABLE public.sourcecache TO webmap_guest;
+GRANT SELECT ON TABLE public.layercache TO webmap_guest;
--
--- Name: SEQUENCE sourcecache_ogc_fid_seq; Type: ACL; Schema: public; Owner: webmap_import
+-- Name: SEQUENCE layercache_ogc_fid_seq; Type: ACL; Schema: public; Owner: webmap_import
--
-GRANT SELECT,USAGE ON SEQUENCE public.sourcecache_ogc_fid_seq TO webmap_guest;
+GRANT SELECT,USAGE ON SEQUENCE public.layercache_ogc_fid_seq TO webmap_guest;
--
diff --git a/webmap-import b/webmap-import
index fd10705..0bf9c9e 100755
--- a/webmap-import
+++ b/webmap-import
@@ -28,7 +28,8 @@ from fcntl import flock, LOCK_EX, LOCK_SH
import logging
import argparse
import re
-from datetime import datetime
+from datetime import datetime, timedelta, timezone, UTC
+from math import modf
from pathlib import Path
from typing import Any, Optional, NoReturn
import traceback
@@ -65,7 +66,7 @@ from import_source import (
validateOutputLayer,
clearLayer,
importSources,
- getSourceCacheKey
+ updateLayerCache
)
def setFieldIf(cond : bool,
@@ -284,13 +285,14 @@ def validate_sources(layers : dict[str, Any]) -> None:
for layername in toremove:
layers.pop(layername)
-def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int,
- name : str,
- typ : int,
- subtyp : int = ogr.OFSTNone,
- width : int = 0,
- isNullable : Optional[bool] = None) -> bool:
- """Validate field #idx from the source cache layer/table."""
+def validateLayerCacheField(defn : ogr.FeatureDefn, idx : int,
+ name : str,
+ typ : int,
+ subtyp : int = ogr.OFSTNone,
+ width : int = 0,
+ unique : Optional[bool] = None,
+ nullable : Optional[bool] = None) -> bool:
+ """Validate field #idx from the layer cache table."""
n = defn.GetFieldCount()
if idx >= n:
return False
@@ -299,173 +301,175 @@ def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int,
b = True
name2 = defn.GetName()
if name2 != name:
- logging.warning('Source cache layer\'s field #%d has name "%s" != "%s", ignoring cache',
- idx, name2, name)
+ logging.warning('Layer cache\'s field #%d has name "%s" != "%s"', idx, name2, name)
b = False
- if isNullable is not None and defn.IsNullable() != isNullable:
+ if nullable is not None and defn.IsNullable() != nullable:
# non-fatal
- logging.warning('Source cache layer\'s field #%d ("%s") %s nullable',
- idx, name2, 'is' if defn.IsNullable() else 'is not')
+ logging.warning('Layer cache\'s field #%d ("%s") %s nullable',
+ idx, name2, 'is' if defn.IsNullable() else 'isn\'t')
+
+ if unique is not None and defn.IsUnique() != unique:
+ # non-fatal
+ logging.warning('Layer cache\'s field #%d ("%s") %s unique',
+ idx, name2, 'is' if defn.IsUnique() else 'isn\'t')
typ2 = defn.GetType()
if typ2 != typ:
- logging.warning('Source cache layer\'s field #%d ("%s") has type %s != %s, ignoring cache',
- idx, name2,
+ logging.warning('Layer cache\'s field #%d ("%s") has type %s != %s', idx, name2,
ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ))
b = False
subtyp2 = defn.GetSubType()
if subtyp2 != subtyp:
- logging.warning('Source cache layer\'s field #%d ("%s") has subtype %s != %s, '
- 'ignoring cache', idx, name2,
+ logging.warning('Layer cache\'s field #%d ("%s") has subtype %s != %s', idx, name2,
ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp))
b = False
width2 = defn.GetWidth()
if width2 != 0 and (width == 0 or width2 < width):
# non-fatal
- logging.warning('Source cache layer\'s field #%d ("%s") is too small (width %d < %d)',
+ logging.warning('Layer cache\'s field #%d ("%s") is too small (width %d < %d)',
idx, name2, width2, width)
return b
-def getSourceCacheLayer(ds : gdal.Dataset, name : str|None,
- force : bool = False) -> ogr.Layer|None:
- """Get the source cache layer/table and validate it."""
- if name is None:
- return None
-
+def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool:
+ """Validate layer cache table."""
lyr = ds.GetLayerByName(name)
if lyr is None:
- if not force: # show a warning if args.force is not set
- logging.warning('Table "%s" does not exist, implying --force', name)
- return None
+ logging.warning('Table "%s" does not exist', name)
+ return False
# if not (lyr.TestCapability(ogr.OLCRandomWrite) and
-# gdalVersionMin(maj=3, min=9) and
+# gdalVersionMin(maj=3, min=7) and
# lyr.TestCapability(ogr.OLCUpdateFeature)):
-# # TODO OLCUpdateFeature was added in 3.7 but we also want to use .GetDataset()
-# # which was added in 3.9
# logging.warning('Layer "%s" does not support OLCUpdateFeature capability, '
# 'ignoring cache', name)
# return None
defn = lyr.GetLayerDefn()
fields = [
- { 'name': 'source_path', 'typ': ogr.OFTString,
- 'isNullable': False, 'width': 2047 },
- { 'name': 'archive_member', 'typ': ogr.OFTString,
- 'isNullable': False, 'width': 2047 },
{ 'name': 'layername', 'typ': ogr.OFTString,
- 'isNullable': False, 'width': 255 },
- { 'name': 'mtime_ns', 'typ': ogr.OFTInteger64,
- 'isNullable': True },
+ 'nullable': False, 'unique': True, 'width': 255 },
+ { 'name': 'last_updated', 'typ': ogr.OFTDateTime,
+ 'nullable': False },
]
m = len(fields)
n = defn.GetFieldCount()
if n < m:
- logging.warning('Source cache layer/table "%s" has %d < %d fields, ignoring cache',
- name, n, m)
+ # this is fatal, and `all(bs)` is False so we return False below
+ logging.warning('Layer cache "%s" has %d < %d fields', name, n, m)
elif n != m:
- logging.warning('Source cache layer/table "%s" has %d != %d fields', name, n, m)
- if not all(validateSourceCacheField(defn, i, **fld) for i,fld in enumerate(fields)):
- return None
+ logging.warning('Layer cache "%s" has %d != %d fields', name, n, m)
+ bs = [ validateLayerCacheField(defn, i, **fld) for i,fld in enumerate(fields) ]
+ if not all(bs):
+ return False
n = defn.GetGeomFieldCount()
if n > 0:
geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName())
for i in range(n) ]
- logging.warning('Source cache layer/table "%s" has %d > 0 geometry field(s): %s',
+ logging.warning('Layer cache "%s" has %d > 0 geometry field(s): %s',
name, n, ', '.join(geomFieldNames))
if gdalVersionMin(maj=3, min=5):
style = lyr.GetStyleTable()
if style is not None:
- logging.warning('Source cache layer/table "%s" has a style table "%s"',
+ logging.warning('Layer cache "%s" has a style table "%s"',
name, style.GetLastStyleName())
- return lyr
-
-def getSourcesMtimeNS(sources : dict[str,Any],
- cachedir : Optional[Path] = None) -> dict[str,int|None]:
- """Return a dictionary mapping each source path to its last modification
- time (in nanoseconds), or None if said source path is not a regular file."""
- mtimes_ns = {}
+ return True
+
+def areSourceFilesNewer(layername : str,
+ sources : dict[str,Any],
+ cache : ogr.Layer,
+ cachedir : Optional[Path] = None) -> bool:
+ """Return a boolean indicating whether the layer cache is up to date with
+ respect to the source files found on disk. That is, the last modification
+ and last changed time of each source file needs to be equal or lower than
+ the `last_updated` value found in the layer cache."""
+
+ source_paths = set()
for source in sources:
# the same source_path can be used multiple times, stat(2) only once
source_path = source['source']['path']
- mtimes_ns[source_path] = None
- for source_path in mtimes_ns:
+ source_paths.add(source_path)
+ if len(source_paths) == 0:
+ return False
+
+ t = None
+ mtimes_ns = {}
+ for source_path in source_paths:
path = source_path if cachedir is None else str(cachedir.joinpath(source_path))
try:
st = os.stat(path)
if not S_ISREG(st.st_mode):
raise FileNotFoundError
mtimes_ns[source_path] = st.st_mtime_ns
+ # take max(mtime, ctime): if we lock source paths any update after
+ # aquiring the lock will yield a value larger than time.time_ns()
+ t2 = max(st.st_mtime_ns, st.st_ctime_ns)
+ if t is None or t < t2:
+ t = t2
except (OSError, ValueError):
#logging.warning('Could not stat(%s)', path)
- pass
- return mtimes_ns
-
-def isSourceCacheDirtyTime(lyr : ogr.Layer, sources : dict[str,Any],
- mtimes_ns : dict[str,int|None]) -> bool:
- """Return a boolean indicating whether the source cache layer/table is up to
- date with respect to the dictionary mapping each source path to its last
- modification time. That is, every triplet (source_path, archive_member,
- layername) needs to be present in cache, and the corresponding mtime_ns
- needs to match the stat(2) output of the source file on disk."""
- ks = set()
- for source in sources:
- source_path, archive_member, layername = k = getSourceCacheKey(source)
- if k in ks:
- raise BadConfiguration(f'Duplicate key {k}')
- ks.add(k)
+ return True
+ assert t is not None
- if len(ks) == 0:
- return False
+ attributeFilter = 'layername = ' + escape_literal_str(layername)
+ logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
+ cache.SetAttributeFilter(attributeFilter)
- attributeFilter = []
- for source_path, archive_member, layername in ks:
- attributeFilter.append('(' + escape_literal_str(source_path) + ','
- + escape_literal_str(archive_member) + ','
- + escape_literal_str(layername) + ')')
- if len(attributeFilter) == 1:
- attributeFilter = '= ' + attributeFilter[0]
+ feature = cache.GetNextFeature()
+ if feature is None:
+ # not in cache
+ return True
+
+ if not feature.IsFieldSetAndNotNull(1):
+ ret = True
else:
- attributeFilter = 'IN (' + ','.join(attributeFilter) + ')'
- attributeFilter = '(source_path,archive_member,layername) ' + attributeFilter
- logging.debug('SetAttributeFilter("%s", "%s")', lyr.GetName(), attributeFilter)
- lyr.SetAttributeFilter(attributeFilter)
-
- cache = {}
- feature = lyr.GetNextFeature()
- while feature is not None:
- k = (
- feature.GetFieldAsString(0) if feature.IsFieldSetAndNotNull(0) else None,
- feature.GetFieldAsString(1) if feature.IsFieldSetAndNotNull(1) else None,
- feature.GetFieldAsString(2) if feature.IsFieldSetAndNotNull(2) else None,
+ # https://gdal.org/en/stable/api/python/vector_api.html#osgeo.ogr.Feature.GetFieldAsDateTime
+ # [ year, month, day, hour, minute, second, timezone flag ]
+ dt = feature.GetFieldAsDateTime(1)
+ if not gdalVersionMin(maj=3, min=8):
+ tz = None # assume local time
+ elif dt[6] == ogr.TZFLAG_UNKNOWN:
+ logging.warning('Datetime specified with unknown timezone in layer cache\'s '
+ 'field #%d "%s", assuming local time', 1,
+ feature.GetDefnRef().GetFieldDefn(1).GetName())
+ tz = None
+ elif dt[6] == ogr.TZFLAG_LOCALTIME:
+ tz = None
+ elif dt[6] == ogr.TZFLAG_UTC:
+ tz = UTC
+ else:
+ tz = timezone(offset=timedelta(seconds=(dt[6] - ogr.TZFLAG_UTC) * 900))
+ ms, s = modf(dt[5])
+ dt = datetime(
+ year=dt[0], # including century
+ month=dt[1], # 01 ≤ year ≤ 12
+ day=dt[2], # 01 ≤ day ≤ 31
+ hour=dt[3], # 00 ≤ hour ≤ 23
+ minute=dt[4], # 00 ≤ minute ≤ 59
+ second=int(s), # 00 ≤ second ≤ 59
+ microsecond=round(ms*1000000),
+ tzinfo=tz
)
- if any(k0 is None for k0 in k):
- # no column in the the key may be NULL
- raise RuntimeError(f'Bad key {k}')
- if k in cache:
- raise RuntimeError(f'Duplicate key {k}')
- cache[k] = feature.GetFieldAsInteger64(3) if feature.IsFieldSetAndNotNull(3) else None
- feature = lyr.GetNextFeature()
-
- for k in ks:
- a = cache.get(k, None)
- b = mtimes_ns[k[0]]
- if a is None or b is None or a != b:
- return True
-
- for source_path in sorted({ p for p,_,_ in ks }):
- # XXX datetime.fromtimestamp() doesn't support nanosecond input
- # https://github.com/python/cpython/issues/59648
- mtime = (mtimes_ns[source_path] // 1000)/1000000.
- dt = datetime.fromtimestamp(mtime)
- logging.info('Source file %s is unchanged (last modification time %s)',
- source_path, dt.astimezone().isoformat(timespec='seconds'))
- return False
+ logging.debug('Found entry in layer cache for "%s", last_updated=%s', layername,
+ dt.isoformat(timespec='microseconds'))
+ ret = int(dt.timestamp() * 1000000.) * 1000 < t
+
+ if cache.GetNextFeature() is not None:
+ raise RuntimeError(f'Duplicate key {layername}')
+
+ if not ret:
+ for source_path, mtime_ns in sorted(mtimes_ns.items()):
+ # XXX datetime.fromtimestamp() doesn't support nanosecond input
+ # https://github.com/python/cpython/issues/59648
+ mtime = (mtime_ns // 1000) / 1000000.
+ dt = datetime.fromtimestamp(mtime)
+ logging.info('Source file %s is unchanged (last modification time %s)',
+ source_path, dt.astimezone().isoformat(timespec='seconds'))
+ return ret
def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]:
"""Place shared locks on each source path and return their respective file
@@ -611,6 +615,7 @@ def main() -> NoReturn:
lockdir=args.lockdir_sources)
if (dso.TestCapability(ogr.ODsCTransactions) and
+ # we need SAVEPOINT support
dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')):
logging.debug('Starting transaction')
dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE
@@ -619,10 +624,13 @@ def main() -> NoReturn:
dso.GetDriver().ShortName)
dsoTransaction = False
- # get source cache layer/table
- lyr_sourcecache = getSourceCacheLayer(dso,
- config['dataset'].get('sourcecache-layername', None),
- force=args.force)
+ # validate layer cache
+ lyr_cache = config['dataset'].get('layercache', None)
+ if lyr_cache is not None and not validateCacheLayer(dso, lyr_cache):
+ if not args.force:
+ logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache)
+ args.force = True
+ lyr_cache = None
bChanged = False
rv = 0
@@ -637,14 +645,12 @@ def main() -> NoReturn:
'CreateFeature() method')
sources = layerdef['sources']
- if lyr_sourcecache is None:
- sourcesMtimeNS = None
- else:
- sourcesMtimeNS = getSourcesMtimeNS(sources, cachedir=cachedir)
- if not (args.force or isSourceCacheDirtyTime(lyr_sourcecache, sources,
- mtimes_ns=sourcesMtimeNS)):
- logging.info('Output layer "%s" is up to date, skipping', layername)
- continue
+ if not (lyr_cache is None or args.force or
+ areSourceFilesNewer(layername, sources=sources,
+ cache=dso.GetLayerByName(lyr_cache),
+ cachedir=cachedir)):
+ logging.info('Output layer "%s" is up to date, skipping', layername)
+ continue
validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
@@ -665,17 +671,28 @@ def main() -> NoReturn:
lyrTransaction = False
try:
- clearLayer(dso, lyr) # TODO conditional (only if not new)?
-
description = layerdef.get('description', None)
if (description is not None and
lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
logging.warning('Could not set description metadata')
- bChanged0 = importSources(lyr_dst=lyr, dso=dso, sources=sources,
- mtimes=sourcesMtimeNS,
- lyr_sourcecache=lyr_sourcecache,
- cachedir=cachedir, extent=extent)
+ now = datetime.now().astimezone()
+ clearLayer(dso, lyr) # TODO conditional (only if not new)?
+ importSources(lyr_dst=lyr, sources=sources,
+ cachedir=cachedir, extent=extent)
+
+ # force the PG driver to call EndCopy() to detect errors and trigger a
+ # rollback if needed
+ dso.FlushCache()
+
+ if lyr_cache is None:
+ bChanged0 = True
+ else:
+ updateLayerCache(layername=layername,
+ cache=dso.GetLayerByName(lyr_cache),
+ last_updated=now)
+ bChanged0 = True # TODO fingerprint lyr to detect changes
+ dso.FlushCache()
bChanged = bChanged or bChanged0
if isinstance(lyrTransaction, str):
@@ -732,7 +749,6 @@ def main() -> NoReturn:
sys.exit(1)
finally:
- lyr_sourcecache = None
dso = None
srs = None
extent = None