aboutsummaryrefslogtreecommitdiffstats
path: root/webmap-import
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-04-23 17:55:57 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-04-24 16:49:37 +0200
commit80882acd6ba407847fed0ef308e440b88796e0e1 (patch)
tree7084e4508c9ad3aa72c034735e8cbc85738af93c /webmap-import
parentc42245fc566eab01dd2e1f4b07bcbef7432c89c1 (diff)
Change layer cache logic to target destination layers rather than sources.
In a future commit we'll fingerprint layers to detect changes. Comparing modification times is not enough since some sources (for instance Naturvårdsverket's SCI_Rikstackande) are updated on the server even though no objects are being added; the source layer remains unchanged but the file differs because of OBJECTID changes we are not interested in. Rather than using another cache layer/table for fingerprints, we cache destination layernames rather than triplets (source_path, archive_member, layername), along with the time at which the import was started rather than source_path's mtime. There is indeed no value in having exact source_path's mtime in the cache. What we need is simply a way to detect whether source paths have been updated in a subsequent run. Thanks to the shared locks the ctime of any updated source path will be at least the time when the locks are released, thereby exceeding the last_updated value.
Diffstat (limited to 'webmap-import')
-rwxr-xr-xwebmap-import280
1 files changed, 148 insertions, 132 deletions
diff --git a/webmap-import b/webmap-import
index fd10705..0bf9c9e 100755
--- a/webmap-import
+++ b/webmap-import
@@ -28,7 +28,8 @@ from fcntl import flock, LOCK_EX, LOCK_SH
import logging
import argparse
import re
-from datetime import datetime
+from datetime import datetime, timedelta, timezone, UTC
+from math import modf
from pathlib import Path
from typing import Any, Optional, NoReturn
import traceback
@@ -65,7 +66,7 @@ from import_source import (
validateOutputLayer,
clearLayer,
importSources,
- getSourceCacheKey
+ updateLayerCache
)
def setFieldIf(cond : bool,
@@ -284,13 +285,14 @@ def validate_sources(layers : dict[str, Any]) -> None:
for layername in toremove:
layers.pop(layername)
-def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int,
- name : str,
- typ : int,
- subtyp : int = ogr.OFSTNone,
- width : int = 0,
- isNullable : Optional[bool] = None) -> bool:
- """Validate field #idx from the source cache layer/table."""
+def validateLayerCacheField(defn : ogr.FeatureDefn, idx : int,
+ name : str,
+ typ : int,
+ subtyp : int = ogr.OFSTNone,
+ width : int = 0,
+ unique : Optional[bool] = None,
+ nullable : Optional[bool] = None) -> bool:
+ """Validate field #idx from the layer cache table."""
n = defn.GetFieldCount()
if idx >= n:
return False
@@ -299,173 +301,175 @@ def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int,
b = True
name2 = defn.GetName()
if name2 != name:
- logging.warning('Source cache layer\'s field #%d has name "%s" != "%s", ignoring cache',
- idx, name2, name)
+ logging.warning('Layer cache\'s field #%d has name "%s" != "%s"', idx, name2, name)
b = False
- if isNullable is not None and defn.IsNullable() != isNullable:
+ if nullable is not None and defn.IsNullable() != nullable:
# non-fatal
- logging.warning('Source cache layer\'s field #%d ("%s") %s nullable',
- idx, name2, 'is' if defn.IsNullable() else 'is not')
+ logging.warning('Layer cache\'s field #%d ("%s") %s nullable',
+ idx, name2, 'is' if defn.IsNullable() else 'isn\'t')
+
+ if unique is not None and defn.IsUnique() != unique:
+ # non-fatal
+ logging.warning('Layer cache\'s field #%d ("%s") %s unique',
+ idx, name2, 'is' if defn.IsUnique() else 'isn\'t')
typ2 = defn.GetType()
if typ2 != typ:
- logging.warning('Source cache layer\'s field #%d ("%s") has type %s != %s, ignoring cache',
- idx, name2,
+ logging.warning('Layer cache\'s field #%d ("%s") has type %s != %s', idx, name2,
ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ))
b = False
subtyp2 = defn.GetSubType()
if subtyp2 != subtyp:
- logging.warning('Source cache layer\'s field #%d ("%s") has subtype %s != %s, '
- 'ignoring cache', idx, name2,
+ logging.warning('Layer cache\'s field #%d ("%s") has subtype %s != %s', idx, name2,
ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp))
b = False
width2 = defn.GetWidth()
if width2 != 0 and (width == 0 or width2 < width):
# non-fatal
- logging.warning('Source cache layer\'s field #%d ("%s") is too small (width %d < %d)',
+ logging.warning('Layer cache\'s field #%d ("%s") is too small (width %d < %d)',
idx, name2, width2, width)
return b
-def getSourceCacheLayer(ds : gdal.Dataset, name : str|None,
- force : bool = False) -> ogr.Layer|None:
- """Get the source cache layer/table and validate it."""
- if name is None:
- return None
-
+def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool:
+ """Validate layer cache table."""
lyr = ds.GetLayerByName(name)
if lyr is None:
- if not force: # show a warning if args.force is not set
- logging.warning('Table "%s" does not exist, implying --force', name)
- return None
+ logging.warning('Table "%s" does not exist', name)
+ return False
# if not (lyr.TestCapability(ogr.OLCRandomWrite) and
-# gdalVersionMin(maj=3, min=9) and
+# gdalVersionMin(maj=3, min=7) and
# lyr.TestCapability(ogr.OLCUpdateFeature)):
-# # TODO OLCUpdateFeature was added in 3.7 but we also want to use .GetDataset()
-# # which was added in 3.9
# logging.warning('Layer "%s" does not support OLCUpdateFeature capability, '
# 'ignoring cache', name)
# return None
defn = lyr.GetLayerDefn()
fields = [
- { 'name': 'source_path', 'typ': ogr.OFTString,
- 'isNullable': False, 'width': 2047 },
- { 'name': 'archive_member', 'typ': ogr.OFTString,
- 'isNullable': False, 'width': 2047 },
{ 'name': 'layername', 'typ': ogr.OFTString,
- 'isNullable': False, 'width': 255 },
- { 'name': 'mtime_ns', 'typ': ogr.OFTInteger64,
- 'isNullable': True },
+ 'nullable': False, 'unique': True, 'width': 255 },
+ { 'name': 'last_updated', 'typ': ogr.OFTDateTime,
+ 'nullable': False },
]
m = len(fields)
n = defn.GetFieldCount()
if n < m:
- logging.warning('Source cache layer/table "%s" has %d < %d fields, ignoring cache',
- name, n, m)
+ # this is fatal, and `all(bs)` is False so we return False below
+ logging.warning('Layer cache "%s" has %d < %d fields', name, n, m)
elif n != m:
- logging.warning('Source cache layer/table "%s" has %d != %d fields', name, n, m)
- if not all(validateSourceCacheField(defn, i, **fld) for i,fld in enumerate(fields)):
- return None
+ logging.warning('Layer cache "%s" has %d != %d fields', name, n, m)
+ bs = [ validateLayerCacheField(defn, i, **fld) for i,fld in enumerate(fields) ]
+ if not all(bs):
+ return False
n = defn.GetGeomFieldCount()
if n > 0:
geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName())
for i in range(n) ]
- logging.warning('Source cache layer/table "%s" has %d > 0 geometry field(s): %s',
+ logging.warning('Layer cache "%s" has %d > 0 geometry field(s): %s',
name, n, ', '.join(geomFieldNames))
if gdalVersionMin(maj=3, min=5):
style = lyr.GetStyleTable()
if style is not None:
- logging.warning('Source cache layer/table "%s" has a style table "%s"',
+ logging.warning('Layer cache "%s" has a style table "%s"',
name, style.GetLastStyleName())
- return lyr
-
-def getSourcesMtimeNS(sources : dict[str,Any],
- cachedir : Optional[Path] = None) -> dict[str,int|None]:
- """Return a dictionary mapping each source path to its last modification
- time (in nanoseconds), or None if said source path is not a regular file."""
- mtimes_ns = {}
+ return True
+
+def areSourceFilesNewer(layername : str,
+ sources : dict[str,Any],
+ cache : ogr.Layer,
+ cachedir : Optional[Path] = None) -> bool:
+ """Return a boolean indicating whether the layer cache is up to date with
+ respect to the source files found on disk. That is, the last modification
+ and last changed time of each source file needs to be equal or lower than
+ the `last_updated` value found in the layer cache."""
+
+ source_paths = set()
for source in sources:
# the same source_path can be used multiple times, stat(2) only once
source_path = source['source']['path']
- mtimes_ns[source_path] = None
- for source_path in mtimes_ns:
+ source_paths.add(source_path)
+ if len(source_paths) == 0:
+ return False
+
+ t = None
+ mtimes_ns = {}
+ for source_path in source_paths:
path = source_path if cachedir is None else str(cachedir.joinpath(source_path))
try:
st = os.stat(path)
if not S_ISREG(st.st_mode):
raise FileNotFoundError
mtimes_ns[source_path] = st.st_mtime_ns
+ # take max(mtime, ctime): if we lock source paths any update after
+ # aquiring the lock will yield a value larger than time.time_ns()
+ t2 = max(st.st_mtime_ns, st.st_ctime_ns)
+ if t is None or t < t2:
+ t = t2
except (OSError, ValueError):
#logging.warning('Could not stat(%s)', path)
- pass
- return mtimes_ns
-
-def isSourceCacheDirtyTime(lyr : ogr.Layer, sources : dict[str,Any],
- mtimes_ns : dict[str,int|None]) -> bool:
- """Return a boolean indicating whether the source cache layer/table is up to
- date with respect to the dictionary mapping each source path to its last
- modification time. That is, every triplet (source_path, archive_member,
- layername) needs to be present in cache, and the corresponding mtime_ns
- needs to match the stat(2) output of the source file on disk."""
- ks = set()
- for source in sources:
- source_path, archive_member, layername = k = getSourceCacheKey(source)
- if k in ks:
- raise BadConfiguration(f'Duplicate key {k}')
- ks.add(k)
+ return True
+ assert t is not None
- if len(ks) == 0:
- return False
+ attributeFilter = 'layername = ' + escape_literal_str(layername)
+ logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
+ cache.SetAttributeFilter(attributeFilter)
- attributeFilter = []
- for source_path, archive_member, layername in ks:
- attributeFilter.append('(' + escape_literal_str(source_path) + ','
- + escape_literal_str(archive_member) + ','
- + escape_literal_str(layername) + ')')
- if len(attributeFilter) == 1:
- attributeFilter = '= ' + attributeFilter[0]
+ feature = cache.GetNextFeature()
+ if feature is None:
+ # not in cache
+ return True
+
+ if not feature.IsFieldSetAndNotNull(1):
+ ret = True
else:
- attributeFilter = 'IN (' + ','.join(attributeFilter) + ')'
- attributeFilter = '(source_path,archive_member,layername) ' + attributeFilter
- logging.debug('SetAttributeFilter("%s", "%s")', lyr.GetName(), attributeFilter)
- lyr.SetAttributeFilter(attributeFilter)
-
- cache = {}
- feature = lyr.GetNextFeature()
- while feature is not None:
- k = (
- feature.GetFieldAsString(0) if feature.IsFieldSetAndNotNull(0) else None,
- feature.GetFieldAsString(1) if feature.IsFieldSetAndNotNull(1) else None,
- feature.GetFieldAsString(2) if feature.IsFieldSetAndNotNull(2) else None,
+ # https://gdal.org/en/stable/api/python/vector_api.html#osgeo.ogr.Feature.GetFieldAsDateTime
+ # [ year, month, day, hour, minute, second, timezone flag ]
+ dt = feature.GetFieldAsDateTime(1)
+ if not gdalVersionMin(maj=3, min=8):
+ tz = None # assume local time
+ elif dt[6] == ogr.TZFLAG_UNKNOWN:
+ logging.warning('Datetime specified with unknown timezone in layer cache\'s '
+ 'field #%d "%s", assuming local time', 1,
+ feature.GetDefnRef().GetFieldDefn(1).GetName())
+ tz = None
+ elif dt[6] == ogr.TZFLAG_LOCALTIME:
+ tz = None
+ elif dt[6] == ogr.TZFLAG_UTC:
+ tz = UTC
+ else:
+ tz = timezone(offset=timedelta(seconds=(dt[6] - ogr.TZFLAG_UTC) * 900))
+ ms, s = modf(dt[5])
+ dt = datetime(
+ year=dt[0], # including century
+ month=dt[1], # 01 ≤ year ≤ 12
+ day=dt[2], # 01 ≤ day ≤ 31
+ hour=dt[3], # 00 ≤ hour ≤ 23
+ minute=dt[4], # 00 ≤ minute ≤ 59
+ second=int(s), # 00 ≤ second ≤ 59
+ microsecond=round(ms*1000000),
+ tzinfo=tz
)
- if any(k0 is None for k0 in k):
- # no column in the the key may be NULL
- raise RuntimeError(f'Bad key {k}')
- if k in cache:
- raise RuntimeError(f'Duplicate key {k}')
- cache[k] = feature.GetFieldAsInteger64(3) if feature.IsFieldSetAndNotNull(3) else None
- feature = lyr.GetNextFeature()
-
- for k in ks:
- a = cache.get(k, None)
- b = mtimes_ns[k[0]]
- if a is None or b is None or a != b:
- return True
-
- for source_path in sorted({ p for p,_,_ in ks }):
- # XXX datetime.fromtimestamp() doesn't support nanosecond input
- # https://github.com/python/cpython/issues/59648
- mtime = (mtimes_ns[source_path] // 1000)/1000000.
- dt = datetime.fromtimestamp(mtime)
- logging.info('Source file %s is unchanged (last modification time %s)',
- source_path, dt.astimezone().isoformat(timespec='seconds'))
- return False
+ logging.debug('Found entry in layer cache for "%s", last_updated=%s', layername,
+ dt.isoformat(timespec='microseconds'))
+ ret = int(dt.timestamp() * 1000000.) * 1000 < t
+
+ if cache.GetNextFeature() is not None:
+ raise RuntimeError(f'Duplicate key {layername}')
+
+ if not ret:
+ for source_path, mtime_ns in sorted(mtimes_ns.items()):
+ # XXX datetime.fromtimestamp() doesn't support nanosecond input
+ # https://github.com/python/cpython/issues/59648
+ mtime = (mtime_ns // 1000) / 1000000.
+ dt = datetime.fromtimestamp(mtime)
+ logging.info('Source file %s is unchanged (last modification time %s)',
+ source_path, dt.astimezone().isoformat(timespec='seconds'))
+ return ret
def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]:
"""Place shared locks on each source path and return their respective file
@@ -611,6 +615,7 @@ def main() -> NoReturn:
lockdir=args.lockdir_sources)
if (dso.TestCapability(ogr.ODsCTransactions) and
+ # we need SAVEPOINT support
dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')):
logging.debug('Starting transaction')
dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE
@@ -619,10 +624,13 @@ def main() -> NoReturn:
dso.GetDriver().ShortName)
dsoTransaction = False
- # get source cache layer/table
- lyr_sourcecache = getSourceCacheLayer(dso,
- config['dataset'].get('sourcecache-layername', None),
- force=args.force)
+ # validate layer cache
+ lyr_cache = config['dataset'].get('layercache', None)
+ if lyr_cache is not None and not validateCacheLayer(dso, lyr_cache):
+ if not args.force:
+ logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache)
+ args.force = True
+ lyr_cache = None
bChanged = False
rv = 0
@@ -637,14 +645,12 @@ def main() -> NoReturn:
'CreateFeature() method')
sources = layerdef['sources']
- if lyr_sourcecache is None:
- sourcesMtimeNS = None
- else:
- sourcesMtimeNS = getSourcesMtimeNS(sources, cachedir=cachedir)
- if not (args.force or isSourceCacheDirtyTime(lyr_sourcecache, sources,
- mtimes_ns=sourcesMtimeNS)):
- logging.info('Output layer "%s" is up to date, skipping', layername)
- continue
+ if not (lyr_cache is None or args.force or
+ areSourceFilesNewer(layername, sources=sources,
+ cache=dso.GetLayerByName(lyr_cache),
+ cachedir=cachedir)):
+ logging.info('Output layer "%s" is up to date, skipping', layername)
+ continue
validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
@@ -665,17 +671,28 @@ def main() -> NoReturn:
lyrTransaction = False
try:
- clearLayer(dso, lyr) # TODO conditional (only if not new)?
-
description = layerdef.get('description', None)
if (description is not None and
lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
logging.warning('Could not set description metadata')
- bChanged0 = importSources(lyr_dst=lyr, dso=dso, sources=sources,
- mtimes=sourcesMtimeNS,
- lyr_sourcecache=lyr_sourcecache,
- cachedir=cachedir, extent=extent)
+ now = datetime.now().astimezone()
+ clearLayer(dso, lyr) # TODO conditional (only if not new)?
+ importSources(lyr_dst=lyr, sources=sources,
+ cachedir=cachedir, extent=extent)
+
+ # force the PG driver to call EndCopy() to detect errors and trigger a
+ # rollback if needed
+ dso.FlushCache()
+
+ if lyr_cache is None:
+ bChanged0 = True
+ else:
+ updateLayerCache(layername=layername,
+ cache=dso.GetLayerByName(lyr_cache),
+ last_updated=now)
+ bChanged0 = True # TODO fingerprint lyr to detect changes
+ dso.FlushCache()
bChanged = bChanged or bChanged0
if isinstance(lyrTransaction, str):
@@ -732,7 +749,6 @@ def main() -> NoReturn:
sys.exit(1)
finally:
- lyr_sourcecache = None
dso = None
srs = None
extent = None