From e5e8a6548ef156b785aae1b8a37fe71f26146061 Mon Sep 17 00:00:00 2001 From: Guilhem Moulin Date: Sat, 19 Apr 2025 13:27:49 +0200 Subject: webmap-import: Add a cache layer and store the source file's last modification time. That way we can avoid the expensive unpack+import when the source file(s) have not been updated since the last run. The check can be bypassed with a new flag `--force`. We use a sequence for the FID:s (primary key) and a UNIQUE constraint on triplets (source_path, archive_member, layername) as GDAL doesn't support multicolumns primary keys. To avoid races between the stat(2) calls, gdal.OpenEx() and updates via `webmap-download` runs we place a shared lock on the downloaded files. One could resort to some tricks to eliminate the race between the first two, but there is also some value in having consistency during the entire execution of the script (a single source file can be used by multiple layers for instance, and it makes sense to use the very same file for all layers in that case). We also intersperse dso.FlushCache() calls between _importSource() calls in order to force the PG driver to call EndCopy() to detect errors and trigger a rollback when _importSource() fails. --- common.py | 16 +++ config.yml | 2 + import_source.py | 69 ++++++++++- schema.sql | 84 ++++++++++++- webmap-download | 7 +- webmap-import | 361 +++++++++++++++++++++++++++++++++++++++++++++++++------ 6 files changed, 495 insertions(+), 44 deletions(-) diff --git a/common.py b/common.py index eab9dd5..b1d14ba 100644 --- a/common.py +++ b/common.py @@ -27,6 +27,7 @@ from stat import S_ISDIR import math import logging from typing import Any, Optional, Never +from hashlib import sha256 from xdg.BaseDirectory import xdg_config_home import yaml @@ -143,6 +144,11 @@ def parse_config(path : Optional[Path] = None, return config +# pylint: disable-next=invalid-name +def getSourcePathLockFileName(path : str) -> str: + """Return the name of the lockfile associated with a source path.""" + return sha256(path.encode('utf-8')).hexdigest() + '.lck' + def format_bytes(n : int, threshold : int = 768, precision : int = 2) -> str: """Format a number of bytes to a SI unit""" @@ -174,6 +180,16 @@ def escape_identifier(identifier : str) -> str: # SQL:1999 delimited identifier return '"' + identifier.replace('"', '""') + '"' +def escape_literal_str(literal : str) -> str: + """Escape the given character string literal, cf. + swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_literal().""" + + if literal is None or '\x00' in literal: + raise RuntimeError(f'Invalid literal "{literal}"') + + # SQL:1999 character string literal + return '\'' + literal.replace('\'', '\'\'') + '\'' + ###### # The function definitions below are taken from cpython's source code diff --git a/config.yml b/config.yml index b13f363..2d6a526 100644 --- a/config.yml +++ b/config.yml @@ -108,6 +108,8 @@ dataset: open-options-publish: USER: webmap_guest + sourcecache-layername: public.sourcecache + # Optional dictionary of default layer creation options, cf. # https://gdal.org/drivers/vector/pg.html#layer-creation-options or # https://gdal.org/drivers/vector/gpkg.html#layer-creation-options diff --git a/import_source.py b/import_source.py index e30a245..1fa754c 100644 --- a/import_source.py +++ b/import_source.py @@ -37,7 +37,7 @@ from osgeo.gdalconst import ( ) from osgeo import gdalconst -from common import BadConfiguration, escape_identifier +from common import BadConfiguration, escape_identifier, escape_literal_str from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, formatTZFlag def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset: @@ -431,18 +431,81 @@ def listArchiveMembers(namelist : list[str], logging.debug('Ignoring archive member %s', name) return members +def getSourceCacheKey(source : dict[str,Any]) -> tuple[str,str,str]: + """Get the unique triplet (source_path, archive_member, layername) + for the layer source which can be used for attribute filters and + lookups.""" + importdef = source['import'] + sourcedef = source['source'] + source_path = sourcedef['path'] + archive_member = importdef['path'] if sourcedef.get('unar', None) is not None else '' + layername = importdef.get('layername', '') + return (source_path, archive_member, layername) + # pylint: disable-next=dangerous-default-value def importSources(lyr_dst : ogr.Layer, + dso : gdal.Dataset, sources : dict[str,Any] = {}, cachedir : Path|None = None, - extent : ogr.Geometry|None = None) -> None: - """Import source layers""" + mtimes : dict[str, int|None]|None = None, + lyr_sourcecache : ogr.Layer|None = None, + extent : ogr.Geometry|None = None) -> bool: + """Import source layers.""" + bChanged = False for source in sources: _importSource(lyr_dst, **source['source'], args=source['import'], cachedir=cachedir, extent=extent) + # force the PG driver to call EndCopy() to detect errors and trigger a rollback if needed + # TODO [3.9] use lyr_dst.GetDataset().FlushCache() instead + dso.FlushCache() + + if lyr_sourcecache is None: + bChanged = True + continue + + source_path, archive_member, layername = getSourceCacheKey(source) + attributeFilter = ('(source_path,archive_member,layername) = (' + + escape_literal_str(source_path) + ',' + + escape_literal_str(archive_member) + ',' + + escape_literal_str(layername) + ')') + lyr_sourcecache.SetAttributeFilter(attributeFilter) + + feature = lyr_sourcecache.GetNextFeature() + if feature is not None: + logging.debug('Updating existing feature in source cache for %s', attributeFilter) + update = True + assert lyr_sourcecache.GetNextFeature() is None + else: + logging.debug('Creating new feature in source cache for %s', attributeFilter) + update = False + feature = ogr.Feature(lyr_sourcecache.GetLayerDefn()) + feature.SetFieldString(0, source_path) + feature.SetFieldString(1, archive_member) + feature.SetFieldString(2, layername) + + mtime_ns = mtimes[source_path] + if mtime_ns is None: + feature.SetFieldNull(3) + else: + feature.SetFieldInteger64(3, mtime_ns) + + if update: + # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead + if lyr_sourcecache.SetFeature(feature) != ogr.OGRERR_NONE: + raise RuntimeError('Could not update feature in sourcecache') + else: + if lyr_sourcecache.CreateFeature(feature) != ogr.OGRERR_NONE: + raise RuntimeError('Could not create new feature in sourcecache') + + # TODO [3.9] use lyr_sourcecache.GetDataset().FlushCache() instead + dso.FlushCache() + bChanged = True # TODO fingerprint the features to detect changes + + return bChanged + # pylint: disable-next=dangerous-default-value def _importSource(lyr : ogr.Layer, path : str = '/nonexistent', diff --git a/schema.sql b/schema.sql index 307acac..f56f711 100644 --- a/schema.sql +++ b/schema.sql @@ -2,8 +2,8 @@ -- PostgreSQL database dump -- --- Dumped from database version 15.8 (Debian 15.8-0+deb12u1) --- Dumped by pg_dump version 15.8 (Debian 15.8-0+deb12u1) +-- Dumped from database version 15.12 (Debian 15.12-0+deb12u2) +-- Dumped by pg_dump version 15.12 (Debian 15.12-0+deb12u2) SET statement_timeout = 0; SET lock_timeout = 0; @@ -2755,6 +2755,49 @@ ALTER TABLE postgis."vbk:vindkraftverk_ogc_fid_seq" OWNER TO webmap_import; ALTER SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" OWNED BY postgis."vbk:vindkraftverk".ogc_fid; +-- +-- Name: sourcecache; Type: TABLE; Schema: public; Owner: webmap_import +-- + +CREATE TABLE public.sourcecache ( + ogc_fid bigint NOT NULL, + source_path character varying(2047) NOT NULL, + archive_member character varying(2047) NOT NULL, + layername character varying(255) NOT NULL, + mtime_ns bigint +); + + +ALTER TABLE public.sourcecache OWNER TO webmap_import; + +-- +-- Name: TABLE sourcecache; Type: COMMENT; Schema: public; Owner: webmap_import +-- + +COMMENT ON TABLE public.sourcecache IS 'Metadata cache for sources files'; + + +-- +-- Name: sourcecache_ogc_fid_seq; Type: SEQUENCE; Schema: public; Owner: webmap_import +-- + +CREATE SEQUENCE public.sourcecache_ogc_fid_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +ALTER TABLE public.sourcecache_ogc_fid_seq OWNER TO webmap_import; + +-- +-- Name: sourcecache_ogc_fid_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: webmap_import +-- + +ALTER SEQUENCE public.sourcecache_ogc_fid_seq OWNED BY public.sourcecache.ogc_fid; + + -- -- Name: dammar ogc_fid; Type: DEFAULT; Schema: postgis; Owner: webmap_import -- @@ -3126,6 +3169,13 @@ ALTER TABLE ONLY postgis."vbk:projekteringsomraden" ALTER COLUMN ogc_fid SET DEF ALTER TABLE ONLY postgis."vbk:vindkraftverk" ALTER COLUMN ogc_fid SET DEFAULT nextval('postgis."vbk:vindkraftverk_ogc_fid_seq"'::regclass); +-- +-- Name: sourcecache ogc_fid; Type: DEFAULT; Schema: public; Owner: webmap_import +-- + +ALTER TABLE ONLY public.sourcecache ALTER COLUMN ogc_fid SET DEFAULT nextval('public.sourcecache_ogc_fid_seq'::regclass); + + -- -- Name: dammar dammar_DammID_key; Type: CONSTRAINT; Schema: postgis; Owner: webmap_import -- @@ -4006,6 +4056,22 @@ ALTER TABLE ONLY postgis."vbk:vindkraftverk" ADD CONSTRAINT "vbk:vindkraftverk_pkey" PRIMARY KEY (ogc_fid); +-- +-- Name: sourcecache sourcecache_pkey; Type: CONSTRAINT; Schema: public; Owner: webmap_import +-- + +ALTER TABLE ONLY public.sourcecache + ADD CONSTRAINT sourcecache_pkey PRIMARY KEY (ogc_fid); + + +-- +-- Name: sourcecache sourcecache_source_path_archive_member_layername_key; Type: CONSTRAINT; Schema: public; Owner: webmap_import +-- + +ALTER TABLE ONLY public.sourcecache + ADD CONSTRAINT sourcecache_source_path_archive_member_layername_key UNIQUE (source_path, archive_member, layername); + + -- -- Name: dammar_wkb_geometry_geom_idx; Type: INDEX; Schema: postgis; Owner: webmap_import -- @@ -5156,6 +5222,20 @@ GRANT SELECT ON TABLE postgis."vbk:vindkraftverk" TO webmap_guest; GRANT SELECT,USAGE ON SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" TO webmap_guest; +-- +-- Name: TABLE sourcecache; Type: ACL; Schema: public; Owner: webmap_import +-- + +GRANT SELECT ON TABLE public.sourcecache TO webmap_guest; + + +-- +-- Name: SEQUENCE sourcecache_ogc_fid_seq; Type: ACL; Schema: public; Owner: webmap_import +-- + +GRANT SELECT,USAGE ON SEQUENCE public.sourcecache_ogc_fid_seq TO webmap_guest; + + -- -- PostgreSQL database dump complete -- diff --git a/webmap-download b/webmap-download index edb624e..05aa2c4 100755 --- a/webmap-download +++ b/webmap-download @@ -44,12 +44,11 @@ import argparse import itertools from pathlib import Path from email.utils import parsedate_to_datetime, formatdate -from hashlib import sha256 from typing import Optional, NoReturn, Never import requests import common -from common import BadConfiguration +from common import BadConfiguration, getSourcePathLockFileName def download_trystream(url : str, **kwargs) -> requests.Response: """GET a url, trying a number of times. Return immediately after the @@ -309,9 +308,9 @@ def main() -> NoReturn: # place an exclusive lock on a lockfile as the destination can be used by other layers # hence might be updated in parallel if lockdir_fd is not None: - lockfile = sha256(dest.encode('utf-8')).hexdigest() + '.lck' + lockfile = getSourcePathLockFileName(dest) # use O_TRUNC to bump lockfile's mtime - lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644, + lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o664, dir_fd=lockdir_fd) try: if lockdir_fd is not None: diff --git a/webmap-import b/webmap-import index 1171851..5cb76d3 100755 --- a/webmap-import +++ b/webmap-import @@ -20,13 +20,15 @@ # pylint: disable=invalid-name, missing-module-docstring, fixme -from os import O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC +from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY import os +from stat import S_ISREG import sys -from fcntl import flock, LOCK_EX +from fcntl import flock, LOCK_EX, LOCK_SH import logging import argparse import re +from datetime import datetime from pathlib import Path from typing import Any, Optional, NoReturn import traceback @@ -41,7 +43,12 @@ from osgeo.gdalconst import ( from osgeo import gdalconst import common -from common import BadConfiguration, escape_identifier +from common import ( + BadConfiguration, + escape_identifier, + escape_literal_str, + getSourcePathLockFileName +) from common_gdal import ( gdalVersionMin, gdalGetMetadataItem, @@ -57,7 +64,8 @@ from import_source import ( createOutputLayer, validateOutputLayer, clearLayer, - importSources + importSources, + getSourceCacheKey ) def setFieldIf(cond : bool, @@ -276,6 +284,258 @@ def validate_sources(layers : dict[str, Any]) -> None: for layername in toremove: layers.pop(layername) +def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int, + name : str, + typ : int, + subtyp : int = ogr.OFSTNone, + width : int = 0, + isNullable : Optional[bool] = None) -> bool: + """Validate field #idx from the source cache layer/table.""" + n = defn.GetFieldCount() + if idx >= n: + return False + defn = defn.GetFieldDefn(idx) + + b = True + name2 = defn.GetName() + if name2 != name: + logging.warning('Source cache layer\'s field #%d has name "%s" != "%s", ignoring cache', + idx, name2, name) + b = False + + if isNullable is not None and defn.IsNullable() != isNullable: + # non-fatal + logging.warning('Source cache layer\'s field #%d ("%s") %s nullable', + idx, name2, 'is' if defn.IsNullable() else 'is not') + + typ2 = defn.GetType() + if typ2 != typ: + logging.warning('Source cache layer\'s field #%d ("%s") has type %s != %s, ignoring cache', + idx, name2, + ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ)) + b = False + + subtyp2 = defn.GetSubType() + if subtyp2 != subtyp: + logging.warning('Source cache layer\'s field #%d ("%s") has subtype %s != %s, ' + 'ignoring cache', idx, name2, + ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp)) + b = False + + width2 = defn.GetWidth() + if width2 != 0 and (width == 0 or width2 < width): + # non-fatal + logging.warning('Source cache layer\'s field #%d ("%s") is too small (width %d < %d)', + idx, name2, width2, width) + return b + +def getSourceCacheLayer(ds : gdal.Dataset, name : str|None, + force : bool = False) -> ogr.Layer|None: + """Get the source cache layer/table and validate it.""" + if name is None: + return None + + lyr = ds.GetLayerByName(name) + if lyr is None: + if not force: # show a warning if args.force is not set + logging.warning('Table "%s" does not exist, implying --force', name) + return None + +# if not (lyr.TestCapability(ogr.OLCRandomWrite) and +# gdalVersionMin(maj=3, min=9) and +# lyr.TestCapability(ogr.OLCUpdateFeature)): +# # TODO OLCUpdateFeature was added in 3.7 but we also want to use .GetDataset() +# # which was added in 3.9 +# logging.warning('Layer "%s" does not support OLCUpdateFeature capability, ' +# 'ignoring cache', name) +# return None + + defn = lyr.GetLayerDefn() + fields = [ + { 'name': 'source_path', 'typ': ogr.OFTString, + 'isNullable': False, 'width': 2047 }, + { 'name': 'archive_member', 'typ': ogr.OFTString, + 'isNullable': False, 'width': 2047 }, + { 'name': 'layername', 'typ': ogr.OFTString, + 'isNullable': False, 'width': 255 }, + { 'name': 'mtime_ns', 'typ': ogr.OFTInteger64, + 'isNullable': True }, + ] + m = len(fields) + n = defn.GetFieldCount() + if n < m: + logging.warning('Source cache layer/table "%s" has %d < %d fields, ignoring cache', + name, n, m) + elif n != m: + logging.warning('Source cache layer/table "%s" has %d != %d fields', name, n, m) + if not all(validateSourceCacheField(defn, i, **fld) for i,fld in enumerate(fields)): + return None + + n = defn.GetGeomFieldCount() + if n > 0: + geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName()) + for i in range(n) ] + logging.warning('Source cache layer/table "%s" has %d > 0 geometry field(s): %s', + name, n, ', '.join(geomFieldNames)) + + if gdalVersionMin(maj=3, min=5): + style = lyr.GetStyleTable() + if style is not None: + logging.warning('Source cache layer/table "%s" has a style table "%s"', + name, style.GetLastStyleName()) + return lyr + +def getSourcesMtimeNS(sources : dict[str,Any], + cachedir : Optional[Path] = None) -> dict[str,int|None]: + """Return a dictionary mapping each source path to its last modification + time (in nanoseconds), or None if said source path is not a regular file.""" + mtimes_ns = {} + for source in sources: + # the same source_path can be used multiple times, stat(2) only once + source_path = source['source']['path'] + mtimes_ns[source_path] = None + for source_path in mtimes_ns: + path = source_path if cachedir is None else str(cachedir.joinpath(source_path)) + try: + st = os.stat(path) + if not S_ISREG(st.st_mode): + raise FileNotFoundError + mtimes_ns[source_path] = st.st_mtime_ns + except (OSError, ValueError): + #logging.warning('Could not stat(%s)', path) + pass + return mtimes_ns + +def isSourceCacheDirtyTime(lyr : ogr.Layer, sources : dict[str,Any], + mtimes_ns : dict[str,int|None]) -> bool: + """Return a boolean indicating whether the source cache layer/table is up to + date with respect to the dictionary mapping each source path to its last + modification time. That is, every triplet (source_path, archive_member, + layername) needs to be present in cache, and the corresponding mtime_ns + needs to match the stat(2) output of the source file on disk.""" + ks = set() + for source in sources: + source_path, archive_member, layername = k = getSourceCacheKey(source) + if k in ks: + raise BadConfiguration(f'Duplicate key {k}') + ks.add(k) + + if len(ks) == 0: + return False + + attributeFilter = [] + for source_path, archive_member, layername in ks: + attributeFilter.append('(' + escape_literal_str(source_path) + ',' + + escape_literal_str(archive_member) + ',' + + escape_literal_str(layername) + ')') + if len(attributeFilter) == 1: + attributeFilter = '= ' + attributeFilter[0] + else: + attributeFilter = 'IN (' + ','.join(attributeFilter) + ')' + attributeFilter = '(source_path,archive_member,layername) ' + attributeFilter + logging.debug('SetAttributeFilter("%s", "%s")', lyr.GetName(), attributeFilter) + lyr.SetAttributeFilter(attributeFilter) + + cache = {} + feature = lyr.GetNextFeature() + while feature is not None: + k = ( + feature.GetFieldAsString(0) if feature.IsFieldSetAndNotNull(0) else None, + feature.GetFieldAsString(1) if feature.IsFieldSetAndNotNull(1) else None, + feature.GetFieldAsString(2) if feature.IsFieldSetAndNotNull(2) else None, + ) + if any(k0 is None for k0 in k): + # no column in the the key may be NULL + raise RuntimeError(f'Bad key {k}') + if k in cache: + raise RuntimeError(f'Duplicate key {k}') + cache[k] = feature.GetFieldAsInteger64(3) if feature.IsFieldSetAndNotNull(3) else None + feature = lyr.GetNextFeature() + + for k in ks: + a = cache.get(k, None) + b = mtimes_ns[k[0]] + if a is None or b is None or a != b: + return True + + for source_path in sorted({ p for p,_,_ in ks }): + # XXX datetime.fromtimestamp() doesn't support nanosecond input + # https://github.com/python/cpython/issues/59648 + mtime = (mtimes_ns[source_path] // 1000)/1000000. + dt = datetime.fromtimestamp(mtime) + logging.info('Source file %s is unchanged (last modification time %s)', + source_path, dt.astimezone().isoformat(timespec='seconds')) + return False + +def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]: + """Place shared locks on each source path and return their respective file + descriptors. We could do that one layerdef at a time (one output layer at a + time) to reduce the time during which the sources prevented from being + updated/downloaded, but their is some value in having consistency across the + whole import process.""" + lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY) + try: + ret = {} + for layerdef in layerdefs: + for source in layerdef['sources']: + source_path = source['source']['path'] + lockfile = getSourcePathLockFileName(source_path) + lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664, + dir_fd=lockdir_fd) + logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)', + source_path, lockfile) + flock(lock_fd, LOCK_SH) + ret[source_path] = lock_fd + return ret + finally: + try: + os.close(lockdir_fd) + except (OSError, ValueError): + logging.exception('Could not close lockdir') + +def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None: + """Release shared locks on the source paths. Closed FDs are removed from + the dictionary in place.""" + toremove = set() + for path, lock_fd in lock_fds.items(): + try: + os.close(lock_fd) + except (OSError, ValueError): + logging.exception('Could not release lock for %s', path) + else: + logging.debug('Released lock for %s', path) + toremove.add(path) + for path in toremove: + lock_fds.pop(path) + +def commitOrRollbackTransactionIf(obj : gdal.Dataset|ogr.Layer, commit : bool) -> bool: + """Either commit or rollback the transaction, depending on a condition + indicating whether there are pending new changes to be committed.""" + if commit: + logging.debug('Committing transaction') + if obj.CommitTransaction() == ogr.OGRERR_NONE: + return True + logging.error('Could not commit transaction') + else: + logging.info('No changes detected, rolling back transaction') + # don't call rollbackTransactionTry() as we don't want to catch exceptions + if obj.RollbackTransaction() == ogr.OGRERR_NONE: + return True + logging.error('Could not rollback transaction') + return False + +def rollbackTransactionTry(obj : gdal.Dataset|ogr.Layer) -> bool: + """Try to rollback the current transaction on the dataset or layer, ignoring + any exception but logging errors and returning a boolean indicating + success.""" + try: + if obj.RollbackTransaction() == ogr.OGRERR_NONE: + return True + logging.error('Could not rollback transaction') + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not rollback transaction') + return False + # pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements def main() -> NoReturn: common.init_logger(app=os.path.basename(__file__), level=logging.INFO) @@ -286,7 +546,11 @@ def main() -> NoReturn: parser.add_argument('--debug', action='count', default=0, help=argparse.SUPPRESS) parser.add_argument('--lockfile', default=None, - help='obtain an exclusive lock before starting unpacking and importing') + help='obtain an exclusive lock before processing') + parser.add_argument('--lockdir-sources', default=None, + help='optional directory for lock files to sources paths') + parser.add_argument('--force', default=False, action='store_true', + help='import even if no new changes were detected') parser.add_argument('groupname', nargs='*', help='group layer name(s) to process') args = parser.parse_args() @@ -313,8 +577,8 @@ def main() -> NoReturn: dso = openOutputDS(config['dataset']) validate_schema(layers, - drvo=dso.GetDriver(), - lco_defaults=config['dataset'].get('create-layer-options', None)) + drvo=dso.GetDriver(), + lco_defaults=config['dataset'].get('create-layer-options', None)) # get configured Spatial Reference System and extent srs = getSRS(config.get('SRS', None)) @@ -340,6 +604,11 @@ def main() -> NoReturn: createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None)) cachedir = Path(args.cachedir) if args.cachedir is not None else None + if args.lockdir_sources is None: + sourcePathLocks = None + else: + sourcePathLocks = lockSourcePaths(layerdefs=layers.values(), + lockdir=args.lockdir_sources) if (dso.TestCapability(ogr.ODsCTransactions) and dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')): @@ -350,6 +619,12 @@ def main() -> NoReturn: dso.GetDriver().ShortName) dsoTransaction = False + # get source cache layer/table + lyr_sourcecache = getSourceCacheLayer(dso, + config['dataset'].get('sourcecache-layername', None), + force=args.force) + + bChanged = False rv = 0 try: for layername, layerdef in layers.items(): @@ -360,9 +635,18 @@ def main() -> NoReturn: if not lyr.TestCapability(ogr.OLCSequentialWrite): raise RuntimeError(f'Output layer "{layername}" has no working ' 'CreateFeature() method') - validateOutputLayer(lyr, srs=srs, options=layerdef['create']) sources = layerdef['sources'] + if lyr_sourcecache is None: + sourcesMtimeNS = None + else: + sourcesMtimeNS = getSourcesMtimeNS(sources, cachedir=cachedir) + if not (args.force or isSourceCacheDirtyTime(lyr_sourcecache, sources, + mtimes_ns=sourcesMtimeNS)): + logging.info('Output layer "%s" is up to date, skipping', layername) + continue + + validateOutputLayer(lyr, srs=srs, options=layerdef['create']) # setup output field mapping in the sources dictionary setOutputFieldMap(lyr.GetLayerDefn(), sources) @@ -381,36 +665,44 @@ def main() -> NoReturn: lyrTransaction = False try: - clearLayer(dso, lyr) + clearLayer(dso, lyr) # TODO conditional (only if not new)? description = layerdef.get('description', None) if (description is not None and lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): logging.warning('Could not set description metadata') - importSources(lyr, sources=sources, cachedir=cachedir, extent=extent) + bChanged0 = importSources(lyr_dst=lyr, dso=dso, sources=sources, + mtimes=sourcesMtimeNS, + lyr_sourcecache=lyr_sourcecache, + cachedir=cachedir, extent=extent) + bChanged = bChanged or bChanged0 - if isinstance(lyrTransaction, bool) and lyrTransaction: - # commit transaction - logging.debug('Committing transaction') + if isinstance(lyrTransaction, str): + if not (bChanged0 or args.force): + logging.info('No changes detected, rolling back to previous savepoint') + query = 'ROLLBACK TO ' + lyrTransaction + logging.debug(query) + try: + dso.ExecuteSQL(query) + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not %s', query) + rv = 1 + elif isinstance(lyrTransaction, bool) and lyrTransaction: lyrTransaction = False - if lyr.CommitTransaction() != ogr.OGRERR_NONE: - logging.error('Could not commit transaction') + if not commitOrRollbackTransactionIf(lyr, commit=bChanged0 or args.force): rv = 1 except Exception: # pylint: disable=broad-exception-caught if isinstance(lyrTransaction, str): query = 'ROLLBACK TO ' + lyrTransaction - logging.exception('Exception occured in transaction, %s', query) + logging.exception('Exception occured within transaction, ' + 'rolling back to previous savepoint') logging.debug(query) dso.ExecuteSQL(query) elif isinstance(lyrTransaction, bool) and lyrTransaction: - logging.exception('Exception occured in transaction, rolling back') - try: - if lyr.RollbackTransaction() != ogr.OGRERR_NONE: - logging.error('Could not rollback transaction') - except RuntimeError: - logging.exception('Could not rollback transaction') + logging.exception('Exception occured within transaction, rolling back') + rollbackTransactionTry(lyr) else: traceback.print_exc() rv = 1 @@ -422,29 +714,28 @@ def main() -> NoReturn: logging.debug(query) dso.ExecuteSQL(query) + if sourcePathLocks is not None: + releaseSourcePathLocks(sourcePathLocks) + if dsoTransaction: # commit transaction - logging.debug('Committing transaction') dsoTransaction = False - if dso.CommitTransaction() != ogr.OGRERR_NONE: - logging.error('Could not commit transaction') + if not commitOrRollbackTransactionIf(dso, commit=bChanged or args.force): rv = 1 except Exception: # pylint: disable=broad-exception-caught if dsoTransaction: - logging.exception('Exception occured in transaction, rolling back') - try: - if dso.RollbackTransaction() != ogr.OGRERR_NONE: - logging.error('Could not rollback transaction') - except RuntimeError: - logging.exception('Could not rollback transaction') + logging.exception('Exception occured within transaction, rolling back') + rollbackTransactionTry(lyr) else: traceback.print_exc() - rv = 1 + sys.exit(1) - dso = None - srs = None - extent = None + finally: + lyr_sourcecache = None + dso = None + srs = None + extent = None sys.exit(rv) gdal.UseExceptions() -- cgit v1.2.3