aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-04-19 13:27:49 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-04-23 12:09:24 +0200
commite5e8a6548ef156b785aae1b8a37fe71f26146061 (patch)
treeff774b2dbccb133f0f75d4731de9e302dfcc59bf
parentc33799f69e7eb42cb0ab4735c7e878d74faca16a (diff)
webmap-import: Add a cache layer and store the source file's last modification time.
That way we can avoid the expensive unpack+import when the source file(s) have not been updated since the last run. The check can be bypassed with a new flag `--force`. We use a sequence for the FID:s (primary key) and a UNIQUE constraint on triplets (source_path, archive_member, layername) as GDAL doesn't support multicolumns primary keys. To avoid races between the stat(2) calls, gdal.OpenEx() and updates via `webmap-download` runs we place a shared lock on the downloaded files. One could resort to some tricks to eliminate the race between the first two, but there is also some value in having consistency during the entire execution of the script (a single source file can be used by multiple layers for instance, and it makes sense to use the very same file for all layers in that case). We also intersperse dso.FlushCache() calls between _importSource() calls in order to force the PG driver to call EndCopy() to detect errors and trigger a rollback when _importSource() fails.
-rw-r--r--common.py16
-rw-r--r--config.yml2
-rw-r--r--import_source.py69
-rw-r--r--schema.sql84
-rwxr-xr-xwebmap-download7
-rwxr-xr-xwebmap-import361
6 files changed, 495 insertions, 44 deletions
diff --git a/common.py b/common.py
index eab9dd5..b1d14ba 100644
--- a/common.py
+++ b/common.py
@@ -27,6 +27,7 @@ from stat import S_ISDIR
import math
import logging
from typing import Any, Optional, Never
+from hashlib import sha256
from xdg.BaseDirectory import xdg_config_home
import yaml
@@ -143,6 +144,11 @@ def parse_config(path : Optional[Path] = None,
return config
+# pylint: disable-next=invalid-name
+def getSourcePathLockFileName(path : str) -> str:
+ """Return the name of the lockfile associated with a source path."""
+ return sha256(path.encode('utf-8')).hexdigest() + '.lck'
+
def format_bytes(n : int, threshold : int = 768, precision : int = 2) -> str:
"""Format a number of bytes to a SI unit"""
@@ -174,6 +180,16 @@ def escape_identifier(identifier : str) -> str:
# SQL:1999 delimited identifier
return '"' + identifier.replace('"', '""') + '"'
+def escape_literal_str(literal : str) -> str:
+ """Escape the given character string literal, cf.
+ swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_literal()."""
+
+ if literal is None or '\x00' in literal:
+ raise RuntimeError(f'Invalid literal "{literal}"')
+
+ # SQL:1999 character string literal
+ return '\'' + literal.replace('\'', '\'\'') + '\''
+
######
# The function definitions below are taken from cpython's source code
diff --git a/config.yml b/config.yml
index b13f363..2d6a526 100644
--- a/config.yml
+++ b/config.yml
@@ -108,6 +108,8 @@ dataset:
open-options-publish:
USER: webmap_guest
+ sourcecache-layername: public.sourcecache
+
# Optional dictionary of default layer creation options, cf.
# https://gdal.org/drivers/vector/pg.html#layer-creation-options or
# https://gdal.org/drivers/vector/gpkg.html#layer-creation-options
diff --git a/import_source.py b/import_source.py
index e30a245..1fa754c 100644
--- a/import_source.py
+++ b/import_source.py
@@ -37,7 +37,7 @@ from osgeo.gdalconst import (
)
from osgeo import gdalconst
-from common import BadConfiguration, escape_identifier
+from common import BadConfiguration, escape_identifier, escape_literal_str
from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, formatTZFlag
def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset:
@@ -431,18 +431,81 @@ def listArchiveMembers(namelist : list[str],
logging.debug('Ignoring archive member %s', name)
return members
+def getSourceCacheKey(source : dict[str,Any]) -> tuple[str,str,str]:
+ """Get the unique triplet (source_path, archive_member, layername)
+ for the layer source which can be used for attribute filters and
+ lookups."""
+ importdef = source['import']
+ sourcedef = source['source']
+ source_path = sourcedef['path']
+ archive_member = importdef['path'] if sourcedef.get('unar', None) is not None else ''
+ layername = importdef.get('layername', '')
+ return (source_path, archive_member, layername)
+
# pylint: disable-next=dangerous-default-value
def importSources(lyr_dst : ogr.Layer,
+ dso : gdal.Dataset,
sources : dict[str,Any] = {},
cachedir : Path|None = None,
- extent : ogr.Geometry|None = None) -> None:
- """Import source layers"""
+ mtimes : dict[str, int|None]|None = None,
+ lyr_sourcecache : ogr.Layer|None = None,
+ extent : ogr.Geometry|None = None) -> bool:
+ """Import source layers."""
+ bChanged = False
for source in sources:
_importSource(lyr_dst, **source['source'],
args=source['import'],
cachedir=cachedir,
extent=extent)
+ # force the PG driver to call EndCopy() to detect errors and trigger a rollback if needed
+ # TODO [3.9] use lyr_dst.GetDataset().FlushCache() instead
+ dso.FlushCache()
+
+ if lyr_sourcecache is None:
+ bChanged = True
+ continue
+
+ source_path, archive_member, layername = getSourceCacheKey(source)
+ attributeFilter = ('(source_path,archive_member,layername) = ('
+ + escape_literal_str(source_path) + ','
+ + escape_literal_str(archive_member) + ','
+ + escape_literal_str(layername) + ')')
+ lyr_sourcecache.SetAttributeFilter(attributeFilter)
+
+ feature = lyr_sourcecache.GetNextFeature()
+ if feature is not None:
+ logging.debug('Updating existing feature in source cache for %s', attributeFilter)
+ update = True
+ assert lyr_sourcecache.GetNextFeature() is None
+ else:
+ logging.debug('Creating new feature in source cache for %s', attributeFilter)
+ update = False
+ feature = ogr.Feature(lyr_sourcecache.GetLayerDefn())
+ feature.SetFieldString(0, source_path)
+ feature.SetFieldString(1, archive_member)
+ feature.SetFieldString(2, layername)
+
+ mtime_ns = mtimes[source_path]
+ if mtime_ns is None:
+ feature.SetFieldNull(3)
+ else:
+ feature.SetFieldInteger64(3, mtime_ns)
+
+ if update:
+ # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead
+ if lyr_sourcecache.SetFeature(feature) != ogr.OGRERR_NONE:
+ raise RuntimeError('Could not update feature in sourcecache')
+ else:
+ if lyr_sourcecache.CreateFeature(feature) != ogr.OGRERR_NONE:
+ raise RuntimeError('Could not create new feature in sourcecache')
+
+ # TODO [3.9] use lyr_sourcecache.GetDataset().FlushCache() instead
+ dso.FlushCache()
+ bChanged = True # TODO fingerprint the features to detect changes
+
+ return bChanged
+
# pylint: disable-next=dangerous-default-value
def _importSource(lyr : ogr.Layer,
path : str = '/nonexistent',
diff --git a/schema.sql b/schema.sql
index 307acac..f56f711 100644
--- a/schema.sql
+++ b/schema.sql
@@ -2,8 +2,8 @@
-- PostgreSQL database dump
--
--- Dumped from database version 15.8 (Debian 15.8-0+deb12u1)
--- Dumped by pg_dump version 15.8 (Debian 15.8-0+deb12u1)
+-- Dumped from database version 15.12 (Debian 15.12-0+deb12u2)
+-- Dumped by pg_dump version 15.12 (Debian 15.12-0+deb12u2)
SET statement_timeout = 0;
SET lock_timeout = 0;
@@ -2756,6 +2756,49 @@ ALTER SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" OWNED BY postgis."vbk:vin
--
+-- Name: sourcecache; Type: TABLE; Schema: public; Owner: webmap_import
+--
+
+CREATE TABLE public.sourcecache (
+ ogc_fid bigint NOT NULL,
+ source_path character varying(2047) NOT NULL,
+ archive_member character varying(2047) NOT NULL,
+ layername character varying(255) NOT NULL,
+ mtime_ns bigint
+);
+
+
+ALTER TABLE public.sourcecache OWNER TO webmap_import;
+
+--
+-- Name: TABLE sourcecache; Type: COMMENT; Schema: public; Owner: webmap_import
+--
+
+COMMENT ON TABLE public.sourcecache IS 'Metadata cache for sources files';
+
+
+--
+-- Name: sourcecache_ogc_fid_seq; Type: SEQUENCE; Schema: public; Owner: webmap_import
+--
+
+CREATE SEQUENCE public.sourcecache_ogc_fid_seq
+ START WITH 1
+ INCREMENT BY 1
+ NO MINVALUE
+ NO MAXVALUE
+ CACHE 1;
+
+
+ALTER TABLE public.sourcecache_ogc_fid_seq OWNER TO webmap_import;
+
+--
+-- Name: sourcecache_ogc_fid_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: webmap_import
+--
+
+ALTER SEQUENCE public.sourcecache_ogc_fid_seq OWNED BY public.sourcecache.ogc_fid;
+
+
+--
-- Name: dammar ogc_fid; Type: DEFAULT; Schema: postgis; Owner: webmap_import
--
@@ -3127,6 +3170,13 @@ ALTER TABLE ONLY postgis."vbk:vindkraftverk" ALTER COLUMN ogc_fid SET DEFAULT ne
--
+-- Name: sourcecache ogc_fid; Type: DEFAULT; Schema: public; Owner: webmap_import
+--
+
+ALTER TABLE ONLY public.sourcecache ALTER COLUMN ogc_fid SET DEFAULT nextval('public.sourcecache_ogc_fid_seq'::regclass);
+
+
+--
-- Name: dammar dammar_DammID_key; Type: CONSTRAINT; Schema: postgis; Owner: webmap_import
--
@@ -4007,6 +4057,22 @@ ALTER TABLE ONLY postgis."vbk:vindkraftverk"
--
+-- Name: sourcecache sourcecache_pkey; Type: CONSTRAINT; Schema: public; Owner: webmap_import
+--
+
+ALTER TABLE ONLY public.sourcecache
+ ADD CONSTRAINT sourcecache_pkey PRIMARY KEY (ogc_fid);
+
+
+--
+-- Name: sourcecache sourcecache_source_path_archive_member_layername_key; Type: CONSTRAINT; Schema: public; Owner: webmap_import
+--
+
+ALTER TABLE ONLY public.sourcecache
+ ADD CONSTRAINT sourcecache_source_path_archive_member_layername_key UNIQUE (source_path, archive_member, layername);
+
+
+--
-- Name: dammar_wkb_geometry_geom_idx; Type: INDEX; Schema: postgis; Owner: webmap_import
--
@@ -5157,6 +5223,20 @@ GRANT SELECT,USAGE ON SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" TO webmap
--
+-- Name: TABLE sourcecache; Type: ACL; Schema: public; Owner: webmap_import
+--
+
+GRANT SELECT ON TABLE public.sourcecache TO webmap_guest;
+
+
+--
+-- Name: SEQUENCE sourcecache_ogc_fid_seq; Type: ACL; Schema: public; Owner: webmap_import
+--
+
+GRANT SELECT,USAGE ON SEQUENCE public.sourcecache_ogc_fid_seq TO webmap_guest;
+
+
+--
-- PostgreSQL database dump complete
--
diff --git a/webmap-download b/webmap-download
index edb624e..05aa2c4 100755
--- a/webmap-download
+++ b/webmap-download
@@ -44,12 +44,11 @@ import argparse
import itertools
from pathlib import Path
from email.utils import parsedate_to_datetime, formatdate
-from hashlib import sha256
from typing import Optional, NoReturn, Never
import requests
import common
-from common import BadConfiguration
+from common import BadConfiguration, getSourcePathLockFileName
def download_trystream(url : str, **kwargs) -> requests.Response:
"""GET a url, trying a number of times. Return immediately after the
@@ -309,9 +308,9 @@ def main() -> NoReturn:
# place an exclusive lock on a lockfile as the destination can be used by other layers
# hence might be updated in parallel
if lockdir_fd is not None:
- lockfile = sha256(dest.encode('utf-8')).hexdigest() + '.lck'
+ lockfile = getSourcePathLockFileName(dest)
# use O_TRUNC to bump lockfile's mtime
- lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644,
+ lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o664,
dir_fd=lockdir_fd)
try:
if lockdir_fd is not None:
diff --git a/webmap-import b/webmap-import
index 1171851..5cb76d3 100755
--- a/webmap-import
+++ b/webmap-import
@@ -20,13 +20,15 @@
# pylint: disable=invalid-name, missing-module-docstring, fixme
-from os import O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC
+from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY
import os
+from stat import S_ISREG
import sys
-from fcntl import flock, LOCK_EX
+from fcntl import flock, LOCK_EX, LOCK_SH
import logging
import argparse
import re
+from datetime import datetime
from pathlib import Path
from typing import Any, Optional, NoReturn
import traceback
@@ -41,7 +43,12 @@ from osgeo.gdalconst import (
from osgeo import gdalconst
import common
-from common import BadConfiguration, escape_identifier
+from common import (
+ BadConfiguration,
+ escape_identifier,
+ escape_literal_str,
+ getSourcePathLockFileName
+)
from common_gdal import (
gdalVersionMin,
gdalGetMetadataItem,
@@ -57,7 +64,8 @@ from import_source import (
createOutputLayer,
validateOutputLayer,
clearLayer,
- importSources
+ importSources,
+ getSourceCacheKey
)
def setFieldIf(cond : bool,
@@ -276,6 +284,258 @@ def validate_sources(layers : dict[str, Any]) -> None:
for layername in toremove:
layers.pop(layername)
+def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int,
+ name : str,
+ typ : int,
+ subtyp : int = ogr.OFSTNone,
+ width : int = 0,
+ isNullable : Optional[bool] = None) -> bool:
+ """Validate field #idx from the source cache layer/table."""
+ n = defn.GetFieldCount()
+ if idx >= n:
+ return False
+ defn = defn.GetFieldDefn(idx)
+
+ b = True
+ name2 = defn.GetName()
+ if name2 != name:
+ logging.warning('Source cache layer\'s field #%d has name "%s" != "%s", ignoring cache',
+ idx, name2, name)
+ b = False
+
+ if isNullable is not None and defn.IsNullable() != isNullable:
+ # non-fatal
+ logging.warning('Source cache layer\'s field #%d ("%s") %s nullable',
+ idx, name2, 'is' if defn.IsNullable() else 'is not')
+
+ typ2 = defn.GetType()
+ if typ2 != typ:
+ logging.warning('Source cache layer\'s field #%d ("%s") has type %s != %s, ignoring cache',
+ idx, name2,
+ ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ))
+ b = False
+
+ subtyp2 = defn.GetSubType()
+ if subtyp2 != subtyp:
+ logging.warning('Source cache layer\'s field #%d ("%s") has subtype %s != %s, '
+ 'ignoring cache', idx, name2,
+ ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp))
+ b = False
+
+ width2 = defn.GetWidth()
+ if width2 != 0 and (width == 0 or width2 < width):
+ # non-fatal
+ logging.warning('Source cache layer\'s field #%d ("%s") is too small (width %d < %d)',
+ idx, name2, width2, width)
+ return b
+
+def getSourceCacheLayer(ds : gdal.Dataset, name : str|None,
+ force : bool = False) -> ogr.Layer|None:
+ """Get the source cache layer/table and validate it."""
+ if name is None:
+ return None
+
+ lyr = ds.GetLayerByName(name)
+ if lyr is None:
+ if not force: # show a warning if args.force is not set
+ logging.warning('Table "%s" does not exist, implying --force', name)
+ return None
+
+# if not (lyr.TestCapability(ogr.OLCRandomWrite) and
+# gdalVersionMin(maj=3, min=9) and
+# lyr.TestCapability(ogr.OLCUpdateFeature)):
+# # TODO OLCUpdateFeature was added in 3.7 but we also want to use .GetDataset()
+# # which was added in 3.9
+# logging.warning('Layer "%s" does not support OLCUpdateFeature capability, '
+# 'ignoring cache', name)
+# return None
+
+ defn = lyr.GetLayerDefn()
+ fields = [
+ { 'name': 'source_path', 'typ': ogr.OFTString,
+ 'isNullable': False, 'width': 2047 },
+ { 'name': 'archive_member', 'typ': ogr.OFTString,
+ 'isNullable': False, 'width': 2047 },
+ { 'name': 'layername', 'typ': ogr.OFTString,
+ 'isNullable': False, 'width': 255 },
+ { 'name': 'mtime_ns', 'typ': ogr.OFTInteger64,
+ 'isNullable': True },
+ ]
+ m = len(fields)
+ n = defn.GetFieldCount()
+ if n < m:
+ logging.warning('Source cache layer/table "%s" has %d < %d fields, ignoring cache',
+ name, n, m)
+ elif n != m:
+ logging.warning('Source cache layer/table "%s" has %d != %d fields', name, n, m)
+ if not all(validateSourceCacheField(defn, i, **fld) for i,fld in enumerate(fields)):
+ return None
+
+ n = defn.GetGeomFieldCount()
+ if n > 0:
+ geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName())
+ for i in range(n) ]
+ logging.warning('Source cache layer/table "%s" has %d > 0 geometry field(s): %s',
+ name, n, ', '.join(geomFieldNames))
+
+ if gdalVersionMin(maj=3, min=5):
+ style = lyr.GetStyleTable()
+ if style is not None:
+ logging.warning('Source cache layer/table "%s" has a style table "%s"',
+ name, style.GetLastStyleName())
+ return lyr
+
+def getSourcesMtimeNS(sources : dict[str,Any],
+ cachedir : Optional[Path] = None) -> dict[str,int|None]:
+ """Return a dictionary mapping each source path to its last modification
+ time (in nanoseconds), or None if said source path is not a regular file."""
+ mtimes_ns = {}
+ for source in sources:
+ # the same source_path can be used multiple times, stat(2) only once
+ source_path = source['source']['path']
+ mtimes_ns[source_path] = None
+ for source_path in mtimes_ns:
+ path = source_path if cachedir is None else str(cachedir.joinpath(source_path))
+ try:
+ st = os.stat(path)
+ if not S_ISREG(st.st_mode):
+ raise FileNotFoundError
+ mtimes_ns[source_path] = st.st_mtime_ns
+ except (OSError, ValueError):
+ #logging.warning('Could not stat(%s)', path)
+ pass
+ return mtimes_ns
+
+def isSourceCacheDirtyTime(lyr : ogr.Layer, sources : dict[str,Any],
+ mtimes_ns : dict[str,int|None]) -> bool:
+ """Return a boolean indicating whether the source cache layer/table is up to
+ date with respect to the dictionary mapping each source path to its last
+ modification time. That is, every triplet (source_path, archive_member,
+ layername) needs to be present in cache, and the corresponding mtime_ns
+ needs to match the stat(2) output of the source file on disk."""
+ ks = set()
+ for source in sources:
+ source_path, archive_member, layername = k = getSourceCacheKey(source)
+ if k in ks:
+ raise BadConfiguration(f'Duplicate key {k}')
+ ks.add(k)
+
+ if len(ks) == 0:
+ return False
+
+ attributeFilter = []
+ for source_path, archive_member, layername in ks:
+ attributeFilter.append('(' + escape_literal_str(source_path) + ','
+ + escape_literal_str(archive_member) + ','
+ + escape_literal_str(layername) + ')')
+ if len(attributeFilter) == 1:
+ attributeFilter = '= ' + attributeFilter[0]
+ else:
+ attributeFilter = 'IN (' + ','.join(attributeFilter) + ')'
+ attributeFilter = '(source_path,archive_member,layername) ' + attributeFilter
+ logging.debug('SetAttributeFilter("%s", "%s")', lyr.GetName(), attributeFilter)
+ lyr.SetAttributeFilter(attributeFilter)
+
+ cache = {}
+ feature = lyr.GetNextFeature()
+ while feature is not None:
+ k = (
+ feature.GetFieldAsString(0) if feature.IsFieldSetAndNotNull(0) else None,
+ feature.GetFieldAsString(1) if feature.IsFieldSetAndNotNull(1) else None,
+ feature.GetFieldAsString(2) if feature.IsFieldSetAndNotNull(2) else None,
+ )
+ if any(k0 is None for k0 in k):
+ # no column in the the key may be NULL
+ raise RuntimeError(f'Bad key {k}')
+ if k in cache:
+ raise RuntimeError(f'Duplicate key {k}')
+ cache[k] = feature.GetFieldAsInteger64(3) if feature.IsFieldSetAndNotNull(3) else None
+ feature = lyr.GetNextFeature()
+
+ for k in ks:
+ a = cache.get(k, None)
+ b = mtimes_ns[k[0]]
+ if a is None or b is None or a != b:
+ return True
+
+ for source_path in sorted({ p for p,_,_ in ks }):
+ # XXX datetime.fromtimestamp() doesn't support nanosecond input
+ # https://github.com/python/cpython/issues/59648
+ mtime = (mtimes_ns[source_path] // 1000)/1000000.
+ dt = datetime.fromtimestamp(mtime)
+ logging.info('Source file %s is unchanged (last modification time %s)',
+ source_path, dt.astimezone().isoformat(timespec='seconds'))
+ return False
+
+def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]:
+ """Place shared locks on each source path and return their respective file
+ descriptors. We could do that one layerdef at a time (one output layer at a
+ time) to reduce the time during which the sources prevented from being
+ updated/downloaded, but their is some value in having consistency across the
+ whole import process."""
+ lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY)
+ try:
+ ret = {}
+ for layerdef in layerdefs:
+ for source in layerdef['sources']:
+ source_path = source['source']['path']
+ lockfile = getSourcePathLockFileName(source_path)
+ lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664,
+ dir_fd=lockdir_fd)
+ logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)',
+ source_path, lockfile)
+ flock(lock_fd, LOCK_SH)
+ ret[source_path] = lock_fd
+ return ret
+ finally:
+ try:
+ os.close(lockdir_fd)
+ except (OSError, ValueError):
+ logging.exception('Could not close lockdir')
+
+def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None:
+ """Release shared locks on the source paths. Closed FDs are removed from
+ the dictionary in place."""
+ toremove = set()
+ for path, lock_fd in lock_fds.items():
+ try:
+ os.close(lock_fd)
+ except (OSError, ValueError):
+ logging.exception('Could not release lock for %s', path)
+ else:
+ logging.debug('Released lock for %s', path)
+ toremove.add(path)
+ for path in toremove:
+ lock_fds.pop(path)
+
+def commitOrRollbackTransactionIf(obj : gdal.Dataset|ogr.Layer, commit : bool) -> bool:
+ """Either commit or rollback the transaction, depending on a condition
+ indicating whether there are pending new changes to be committed."""
+ if commit:
+ logging.debug('Committing transaction')
+ if obj.CommitTransaction() == ogr.OGRERR_NONE:
+ return True
+ logging.error('Could not commit transaction')
+ else:
+ logging.info('No changes detected, rolling back transaction')
+ # don't call rollbackTransactionTry() as we don't want to catch exceptions
+ if obj.RollbackTransaction() == ogr.OGRERR_NONE:
+ return True
+ logging.error('Could not rollback transaction')
+ return False
+
+def rollbackTransactionTry(obj : gdal.Dataset|ogr.Layer) -> bool:
+ """Try to rollback the current transaction on the dataset or layer, ignoring
+ any exception but logging errors and returning a boolean indicating
+ success."""
+ try:
+ if obj.RollbackTransaction() == ogr.OGRERR_NONE:
+ return True
+ logging.error('Could not rollback transaction')
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not rollback transaction')
+ return False
+
# pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements
def main() -> NoReturn:
common.init_logger(app=os.path.basename(__file__), level=logging.INFO)
@@ -286,7 +546,11 @@ def main() -> NoReturn:
parser.add_argument('--debug', action='count', default=0,
help=argparse.SUPPRESS)
parser.add_argument('--lockfile', default=None,
- help='obtain an exclusive lock before starting unpacking and importing')
+ help='obtain an exclusive lock before processing')
+ parser.add_argument('--lockdir-sources', default=None,
+ help='optional directory for lock files to sources paths')
+ parser.add_argument('--force', default=False, action='store_true',
+ help='import even if no new changes were detected')
parser.add_argument('groupname', nargs='*', help='group layer name(s) to process')
args = parser.parse_args()
@@ -313,8 +577,8 @@ def main() -> NoReturn:
dso = openOutputDS(config['dataset'])
validate_schema(layers,
- drvo=dso.GetDriver(),
- lco_defaults=config['dataset'].get('create-layer-options', None))
+ drvo=dso.GetDriver(),
+ lco_defaults=config['dataset'].get('create-layer-options', None))
# get configured Spatial Reference System and extent
srs = getSRS(config.get('SRS', None))
@@ -340,6 +604,11 @@ def main() -> NoReturn:
createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None))
cachedir = Path(args.cachedir) if args.cachedir is not None else None
+ if args.lockdir_sources is None:
+ sourcePathLocks = None
+ else:
+ sourcePathLocks = lockSourcePaths(layerdefs=layers.values(),
+ lockdir=args.lockdir_sources)
if (dso.TestCapability(ogr.ODsCTransactions) and
dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')):
@@ -350,6 +619,12 @@ def main() -> NoReturn:
dso.GetDriver().ShortName)
dsoTransaction = False
+ # get source cache layer/table
+ lyr_sourcecache = getSourceCacheLayer(dso,
+ config['dataset'].get('sourcecache-layername', None),
+ force=args.force)
+
+ bChanged = False
rv = 0
try:
for layername, layerdef in layers.items():
@@ -360,9 +635,18 @@ def main() -> NoReturn:
if not lyr.TestCapability(ogr.OLCSequentialWrite):
raise RuntimeError(f'Output layer "{layername}" has no working '
'CreateFeature() method')
- validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
sources = layerdef['sources']
+ if lyr_sourcecache is None:
+ sourcesMtimeNS = None
+ else:
+ sourcesMtimeNS = getSourcesMtimeNS(sources, cachedir=cachedir)
+ if not (args.force or isSourceCacheDirtyTime(lyr_sourcecache, sources,
+ mtimes_ns=sourcesMtimeNS)):
+ logging.info('Output layer "%s" is up to date, skipping', layername)
+ continue
+
+ validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
# setup output field mapping in the sources dictionary
setOutputFieldMap(lyr.GetLayerDefn(), sources)
@@ -381,36 +665,44 @@ def main() -> NoReturn:
lyrTransaction = False
try:
- clearLayer(dso, lyr)
+ clearLayer(dso, lyr) # TODO conditional (only if not new)?
description = layerdef.get('description', None)
if (description is not None and
lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
logging.warning('Could not set description metadata')
- importSources(lyr, sources=sources, cachedir=cachedir, extent=extent)
+ bChanged0 = importSources(lyr_dst=lyr, dso=dso, sources=sources,
+ mtimes=sourcesMtimeNS,
+ lyr_sourcecache=lyr_sourcecache,
+ cachedir=cachedir, extent=extent)
+ bChanged = bChanged or bChanged0
- if isinstance(lyrTransaction, bool) and lyrTransaction:
- # commit transaction
- logging.debug('Committing transaction')
+ if isinstance(lyrTransaction, str):
+ if not (bChanged0 or args.force):
+ logging.info('No changes detected, rolling back to previous savepoint')
+ query = 'ROLLBACK TO ' + lyrTransaction
+ logging.debug(query)
+ try:
+ dso.ExecuteSQL(query)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not %s', query)
+ rv = 1
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
lyrTransaction = False
- if lyr.CommitTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not commit transaction')
+ if not commitOrRollbackTransactionIf(lyr, commit=bChanged0 or args.force):
rv = 1
except Exception: # pylint: disable=broad-exception-caught
if isinstance(lyrTransaction, str):
query = 'ROLLBACK TO ' + lyrTransaction
- logging.exception('Exception occured in transaction, %s', query)
+ logging.exception('Exception occured within transaction, '
+ 'rolling back to previous savepoint')
logging.debug(query)
dso.ExecuteSQL(query)
elif isinstance(lyrTransaction, bool) and lyrTransaction:
- logging.exception('Exception occured in transaction, rolling back')
- try:
- if lyr.RollbackTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not rollback transaction')
- except RuntimeError:
- logging.exception('Could not rollback transaction')
+ logging.exception('Exception occured within transaction, rolling back')
+ rollbackTransactionTry(lyr)
else:
traceback.print_exc()
rv = 1
@@ -422,29 +714,28 @@ def main() -> NoReturn:
logging.debug(query)
dso.ExecuteSQL(query)
+ if sourcePathLocks is not None:
+ releaseSourcePathLocks(sourcePathLocks)
+
if dsoTransaction:
# commit transaction
- logging.debug('Committing transaction')
dsoTransaction = False
- if dso.CommitTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not commit transaction')
+ if not commitOrRollbackTransactionIf(dso, commit=bChanged or args.force):
rv = 1
except Exception: # pylint: disable=broad-exception-caught
if dsoTransaction:
- logging.exception('Exception occured in transaction, rolling back')
- try:
- if dso.RollbackTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not rollback transaction')
- except RuntimeError:
- logging.exception('Could not rollback transaction')
+ logging.exception('Exception occured within transaction, rolling back')
+ rollbackTransactionTry(lyr)
else:
traceback.print_exc()
- rv = 1
+ sys.exit(1)
- dso = None
- srs = None
- extent = None
+ finally:
+ lyr_sourcecache = None
+ dso = None
+ srs = None
+ extent = None
sys.exit(rv)
gdal.UseExceptions()