aboutsummaryrefslogtreecommitdiffstats
path: root/webmap-import
diff options
context:
space:
mode:
Diffstat (limited to 'webmap-import')
-rwxr-xr-xwebmap-import361
1 files changed, 326 insertions, 35 deletions
diff --git a/webmap-import b/webmap-import
index 1171851..5cb76d3 100755
--- a/webmap-import
+++ b/webmap-import
@@ -20,13 +20,15 @@
# pylint: disable=invalid-name, missing-module-docstring, fixme
-from os import O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC
+from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY
import os
+from stat import S_ISREG
import sys
-from fcntl import flock, LOCK_EX
+from fcntl import flock, LOCK_EX, LOCK_SH
import logging
import argparse
import re
+from datetime import datetime
from pathlib import Path
from typing import Any, Optional, NoReturn
import traceback
@@ -41,7 +43,12 @@ from osgeo.gdalconst import (
from osgeo import gdalconst
import common
-from common import BadConfiguration, escape_identifier
+from common import (
+ BadConfiguration,
+ escape_identifier,
+ escape_literal_str,
+ getSourcePathLockFileName
+)
from common_gdal import (
gdalVersionMin,
gdalGetMetadataItem,
@@ -57,7 +64,8 @@ from import_source import (
createOutputLayer,
validateOutputLayer,
clearLayer,
- importSources
+ importSources,
+ getSourceCacheKey
)
def setFieldIf(cond : bool,
@@ -276,6 +284,258 @@ def validate_sources(layers : dict[str, Any]) -> None:
for layername in toremove:
layers.pop(layername)
+def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int,
+ name : str,
+ typ : int,
+ subtyp : int = ogr.OFSTNone,
+ width : int = 0,
+ isNullable : Optional[bool] = None) -> bool:
+ """Validate field #idx from the source cache layer/table."""
+ n = defn.GetFieldCount()
+ if idx >= n:
+ return False
+ defn = defn.GetFieldDefn(idx)
+
+ b = True
+ name2 = defn.GetName()
+ if name2 != name:
+ logging.warning('Source cache layer\'s field #%d has name "%s" != "%s", ignoring cache',
+ idx, name2, name)
+ b = False
+
+ if isNullable is not None and defn.IsNullable() != isNullable:
+ # non-fatal
+ logging.warning('Source cache layer\'s field #%d ("%s") %s nullable',
+ idx, name2, 'is' if defn.IsNullable() else 'is not')
+
+ typ2 = defn.GetType()
+ if typ2 != typ:
+ logging.warning('Source cache layer\'s field #%d ("%s") has type %s != %s, ignoring cache',
+ idx, name2,
+ ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ))
+ b = False
+
+ subtyp2 = defn.GetSubType()
+ if subtyp2 != subtyp:
+ logging.warning('Source cache layer\'s field #%d ("%s") has subtype %s != %s, '
+ 'ignoring cache', idx, name2,
+ ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp))
+ b = False
+
+ width2 = defn.GetWidth()
+ if width2 != 0 and (width == 0 or width2 < width):
+ # non-fatal
+ logging.warning('Source cache layer\'s field #%d ("%s") is too small (width %d < %d)',
+ idx, name2, width2, width)
+ return b
+
+def getSourceCacheLayer(ds : gdal.Dataset, name : str|None,
+ force : bool = False) -> ogr.Layer|None:
+ """Get the source cache layer/table and validate it."""
+ if name is None:
+ return None
+
+ lyr = ds.GetLayerByName(name)
+ if lyr is None:
+ if not force: # show a warning if args.force is not set
+ logging.warning('Table "%s" does not exist, implying --force', name)
+ return None
+
+# if not (lyr.TestCapability(ogr.OLCRandomWrite) and
+# gdalVersionMin(maj=3, min=9) and
+# lyr.TestCapability(ogr.OLCUpdateFeature)):
+# # TODO OLCUpdateFeature was added in 3.7 but we also want to use .GetDataset()
+# # which was added in 3.9
+# logging.warning('Layer "%s" does not support OLCUpdateFeature capability, '
+# 'ignoring cache', name)
+# return None
+
+ defn = lyr.GetLayerDefn()
+ fields = [
+ { 'name': 'source_path', 'typ': ogr.OFTString,
+ 'isNullable': False, 'width': 2047 },
+ { 'name': 'archive_member', 'typ': ogr.OFTString,
+ 'isNullable': False, 'width': 2047 },
+ { 'name': 'layername', 'typ': ogr.OFTString,
+ 'isNullable': False, 'width': 255 },
+ { 'name': 'mtime_ns', 'typ': ogr.OFTInteger64,
+ 'isNullable': True },
+ ]
+ m = len(fields)
+ n = defn.GetFieldCount()
+ if n < m:
+ logging.warning('Source cache layer/table "%s" has %d < %d fields, ignoring cache',
+ name, n, m)
+ elif n != m:
+ logging.warning('Source cache layer/table "%s" has %d != %d fields', name, n, m)
+ if not all(validateSourceCacheField(defn, i, **fld) for i,fld in enumerate(fields)):
+ return None
+
+ n = defn.GetGeomFieldCount()
+ if n > 0:
+ geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName())
+ for i in range(n) ]
+ logging.warning('Source cache layer/table "%s" has %d > 0 geometry field(s): %s',
+ name, n, ', '.join(geomFieldNames))
+
+ if gdalVersionMin(maj=3, min=5):
+ style = lyr.GetStyleTable()
+ if style is not None:
+ logging.warning('Source cache layer/table "%s" has a style table "%s"',
+ name, style.GetLastStyleName())
+ return lyr
+
+def getSourcesMtimeNS(sources : dict[str,Any],
+ cachedir : Optional[Path] = None) -> dict[str,int|None]:
+ """Return a dictionary mapping each source path to its last modification
+ time (in nanoseconds), or None if said source path is not a regular file."""
+ mtimes_ns = {}
+ for source in sources:
+ # the same source_path can be used multiple times, stat(2) only once
+ source_path = source['source']['path']
+ mtimes_ns[source_path] = None
+ for source_path in mtimes_ns:
+ path = source_path if cachedir is None else str(cachedir.joinpath(source_path))
+ try:
+ st = os.stat(path)
+ if not S_ISREG(st.st_mode):
+ raise FileNotFoundError
+ mtimes_ns[source_path] = st.st_mtime_ns
+ except (OSError, ValueError):
+ #logging.warning('Could not stat(%s)', path)
+ pass
+ return mtimes_ns
+
+def isSourceCacheDirtyTime(lyr : ogr.Layer, sources : dict[str,Any],
+ mtimes_ns : dict[str,int|None]) -> bool:
+ """Return a boolean indicating whether the source cache layer/table is up to
+ date with respect to the dictionary mapping each source path to its last
+ modification time. That is, every triplet (source_path, archive_member,
+ layername) needs to be present in cache, and the corresponding mtime_ns
+ needs to match the stat(2) output of the source file on disk."""
+ ks = set()
+ for source in sources:
+ source_path, archive_member, layername = k = getSourceCacheKey(source)
+ if k in ks:
+ raise BadConfiguration(f'Duplicate key {k}')
+ ks.add(k)
+
+ if len(ks) == 0:
+ return False
+
+ attributeFilter = []
+ for source_path, archive_member, layername in ks:
+ attributeFilter.append('(' + escape_literal_str(source_path) + ','
+ + escape_literal_str(archive_member) + ','
+ + escape_literal_str(layername) + ')')
+ if len(attributeFilter) == 1:
+ attributeFilter = '= ' + attributeFilter[0]
+ else:
+ attributeFilter = 'IN (' + ','.join(attributeFilter) + ')'
+ attributeFilter = '(source_path,archive_member,layername) ' + attributeFilter
+ logging.debug('SetAttributeFilter("%s", "%s")', lyr.GetName(), attributeFilter)
+ lyr.SetAttributeFilter(attributeFilter)
+
+ cache = {}
+ feature = lyr.GetNextFeature()
+ while feature is not None:
+ k = (
+ feature.GetFieldAsString(0) if feature.IsFieldSetAndNotNull(0) else None,
+ feature.GetFieldAsString(1) if feature.IsFieldSetAndNotNull(1) else None,
+ feature.GetFieldAsString(2) if feature.IsFieldSetAndNotNull(2) else None,
+ )
+ if any(k0 is None for k0 in k):
+ # no column in the the key may be NULL
+ raise RuntimeError(f'Bad key {k}')
+ if k in cache:
+ raise RuntimeError(f'Duplicate key {k}')
+ cache[k] = feature.GetFieldAsInteger64(3) if feature.IsFieldSetAndNotNull(3) else None
+ feature = lyr.GetNextFeature()
+
+ for k in ks:
+ a = cache.get(k, None)
+ b = mtimes_ns[k[0]]
+ if a is None or b is None or a != b:
+ return True
+
+ for source_path in sorted({ p for p,_,_ in ks }):
+ # XXX datetime.fromtimestamp() doesn't support nanosecond input
+ # https://github.com/python/cpython/issues/59648
+ mtime = (mtimes_ns[source_path] // 1000)/1000000.
+ dt = datetime.fromtimestamp(mtime)
+ logging.info('Source file %s is unchanged (last modification time %s)',
+ source_path, dt.astimezone().isoformat(timespec='seconds'))
+ return False
+
+def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]:
+ """Place shared locks on each source path and return their respective file
+ descriptors. We could do that one layerdef at a time (one output layer at a
+ time) to reduce the time during which the sources prevented from being
+ updated/downloaded, but their is some value in having consistency across the
+ whole import process."""
+ lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY)
+ try:
+ ret = {}
+ for layerdef in layerdefs:
+ for source in layerdef['sources']:
+ source_path = source['source']['path']
+ lockfile = getSourcePathLockFileName(source_path)
+ lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664,
+ dir_fd=lockdir_fd)
+ logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)',
+ source_path, lockfile)
+ flock(lock_fd, LOCK_SH)
+ ret[source_path] = lock_fd
+ return ret
+ finally:
+ try:
+ os.close(lockdir_fd)
+ except (OSError, ValueError):
+ logging.exception('Could not close lockdir')
+
+def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None:
+ """Release shared locks on the source paths. Closed FDs are removed from
+ the dictionary in place."""
+ toremove = set()
+ for path, lock_fd in lock_fds.items():
+ try:
+ os.close(lock_fd)
+ except (OSError, ValueError):
+ logging.exception('Could not release lock for %s', path)
+ else:
+ logging.debug('Released lock for %s', path)
+ toremove.add(path)
+ for path in toremove:
+ lock_fds.pop(path)
+
+def commitOrRollbackTransactionIf(obj : gdal.Dataset|ogr.Layer, commit : bool) -> bool:
+ """Either commit or rollback the transaction, depending on a condition
+ indicating whether there are pending new changes to be committed."""
+ if commit:
+ logging.debug('Committing transaction')
+ if obj.CommitTransaction() == ogr.OGRERR_NONE:
+ return True
+ logging.error('Could not commit transaction')
+ else:
+ logging.info('No changes detected, rolling back transaction')
+ # don't call rollbackTransactionTry() as we don't want to catch exceptions
+ if obj.RollbackTransaction() == ogr.OGRERR_NONE:
+ return True
+ logging.error('Could not rollback transaction')
+ return False
+
+def rollbackTransactionTry(obj : gdal.Dataset|ogr.Layer) -> bool:
+ """Try to rollback the current transaction on the dataset or layer, ignoring
+ any exception but logging errors and returning a boolean indicating
+ success."""
+ try:
+ if obj.RollbackTransaction() == ogr.OGRERR_NONE:
+ return True
+ logging.error('Could not rollback transaction')
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not rollback transaction')
+ return False
+
# pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements
def main() -> NoReturn:
common.init_logger(app=os.path.basename(__file__), level=logging.INFO)
@@ -286,7 +546,11 @@ def main() -> NoReturn:
parser.add_argument('--debug', action='count', default=0,
help=argparse.SUPPRESS)
parser.add_argument('--lockfile', default=None,
- help='obtain an exclusive lock before starting unpacking and importing')
+ help='obtain an exclusive lock before processing')
+ parser.add_argument('--lockdir-sources', default=None,
+ help='optional directory for lock files to sources paths')
+ parser.add_argument('--force', default=False, action='store_true',
+ help='import even if no new changes were detected')
parser.add_argument('groupname', nargs='*', help='group layer name(s) to process')
args = parser.parse_args()
@@ -313,8 +577,8 @@ def main() -> NoReturn:
dso = openOutputDS(config['dataset'])
validate_schema(layers,
- drvo=dso.GetDriver(),
- lco_defaults=config['dataset'].get('create-layer-options', None))
+ drvo=dso.GetDriver(),
+ lco_defaults=config['dataset'].get('create-layer-options', None))
# get configured Spatial Reference System and extent
srs = getSRS(config.get('SRS', None))
@@ -340,6 +604,11 @@ def main() -> NoReturn:
createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None))
cachedir = Path(args.cachedir) if args.cachedir is not None else None
+ if args.lockdir_sources is None:
+ sourcePathLocks = None
+ else:
+ sourcePathLocks = lockSourcePaths(layerdefs=layers.values(),
+ lockdir=args.lockdir_sources)
if (dso.TestCapability(ogr.ODsCTransactions) and
dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')):
@@ -350,6 +619,12 @@ def main() -> NoReturn:
dso.GetDriver().ShortName)
dsoTransaction = False
+ # get source cache layer/table
+ lyr_sourcecache = getSourceCacheLayer(dso,
+ config['dataset'].get('sourcecache-layername', None),
+ force=args.force)
+
+ bChanged = False
rv = 0
try:
for layername, layerdef in layers.items():
@@ -360,9 +635,18 @@ def main() -> NoReturn:
if not lyr.TestCapability(ogr.OLCSequentialWrite):
raise RuntimeError(f'Output layer "{layername}" has no working '
'CreateFeature() method')
- validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
sources = layerdef['sources']
+ if lyr_sourcecache is None:
+ sourcesMtimeNS = None
+ else:
+ sourcesMtimeNS = getSourcesMtimeNS(sources, cachedir=cachedir)
+ if not (args.force or isSourceCacheDirtyTime(lyr_sourcecache, sources,
+ mtimes_ns=sourcesMtimeNS)):
+ logging.info('Output layer "%s" is up to date, skipping', layername)
+ continue
+
+ validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
# setup output field mapping in the sources dictionary
setOutputFieldMap(lyr.GetLayerDefn(), sources)
@@ -381,36 +665,44 @@ def main() -> NoReturn:
lyrTransaction = False
try:
- clearLayer(dso, lyr)
+ clearLayer(dso, lyr) # TODO conditional (only if not new)?
description = layerdef.get('description', None)
if (description is not None and
lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
logging.warning('Could not set description metadata')
- importSources(lyr, sources=sources, cachedir=cachedir, extent=extent)
+ bChanged0 = importSources(lyr_dst=lyr, dso=dso, sources=sources,
+ mtimes=sourcesMtimeNS,
+ lyr_sourcecache=lyr_sourcecache,
+ cachedir=cachedir, extent=extent)
+ bChanged = bChanged or bChanged0
- if isinstance(lyrTransaction, bool) and lyrTransaction:
- # commit transaction
- logging.debug('Committing transaction')
+ if isinstance(lyrTransaction, str):
+ if not (bChanged0 or args.force):
+ logging.info('No changes detected, rolling back to previous savepoint')
+ query = 'ROLLBACK TO ' + lyrTransaction
+ logging.debug(query)
+ try:
+ dso.ExecuteSQL(query)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not %s', query)
+ rv = 1
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
lyrTransaction = False
- if lyr.CommitTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not commit transaction')
+ if not commitOrRollbackTransactionIf(lyr, commit=bChanged0 or args.force):
rv = 1
except Exception: # pylint: disable=broad-exception-caught
if isinstance(lyrTransaction, str):
query = 'ROLLBACK TO ' + lyrTransaction
- logging.exception('Exception occured in transaction, %s', query)
+ logging.exception('Exception occured within transaction, '
+ 'rolling back to previous savepoint')
logging.debug(query)
dso.ExecuteSQL(query)
elif isinstance(lyrTransaction, bool) and lyrTransaction:
- logging.exception('Exception occured in transaction, rolling back')
- try:
- if lyr.RollbackTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not rollback transaction')
- except RuntimeError:
- logging.exception('Could not rollback transaction')
+ logging.exception('Exception occured within transaction, rolling back')
+ rollbackTransactionTry(lyr)
else:
traceback.print_exc()
rv = 1
@@ -422,29 +714,28 @@ def main() -> NoReturn:
logging.debug(query)
dso.ExecuteSQL(query)
+ if sourcePathLocks is not None:
+ releaseSourcePathLocks(sourcePathLocks)
+
if dsoTransaction:
# commit transaction
- logging.debug('Committing transaction')
dsoTransaction = False
- if dso.CommitTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not commit transaction')
+ if not commitOrRollbackTransactionIf(dso, commit=bChanged or args.force):
rv = 1
except Exception: # pylint: disable=broad-exception-caught
if dsoTransaction:
- logging.exception('Exception occured in transaction, rolling back')
- try:
- if dso.RollbackTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not rollback transaction')
- except RuntimeError:
- logging.exception('Could not rollback transaction')
+ logging.exception('Exception occured within transaction, rolling back')
+ rollbackTransactionTry(lyr)
else:
traceback.print_exc()
- rv = 1
+ sys.exit(1)
- dso = None
- srs = None
- extent = None
+ finally:
+ lyr_sourcecache = None
+ dso = None
+ srs = None
+ extent = None
sys.exit(rv)
gdal.UseExceptions()