diff options
-rw-r--r-- | common.py | 16 | ||||
-rw-r--r-- | config.yml | 2 | ||||
-rw-r--r-- | import_source.py | 69 | ||||
-rw-r--r-- | schema.sql | 84 | ||||
-rwxr-xr-x | webmap-download | 7 | ||||
-rwxr-xr-x | webmap-import | 361 |
6 files changed, 495 insertions, 44 deletions
@@ -27,6 +27,7 @@ from stat import S_ISDIR import math import logging from typing import Any, Optional, Never +from hashlib import sha256 from xdg.BaseDirectory import xdg_config_home import yaml @@ -143,6 +144,11 @@ def parse_config(path : Optional[Path] = None, return config +# pylint: disable-next=invalid-name +def getSourcePathLockFileName(path : str) -> str: + """Return the name of the lockfile associated with a source path.""" + return sha256(path.encode('utf-8')).hexdigest() + '.lck' + def format_bytes(n : int, threshold : int = 768, precision : int = 2) -> str: """Format a number of bytes to a SI unit""" @@ -174,6 +180,16 @@ def escape_identifier(identifier : str) -> str: # SQL:1999 delimited identifier return '"' + identifier.replace('"', '""') + '"' +def escape_literal_str(literal : str) -> str: + """Escape the given character string literal, cf. + swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_literal().""" + + if literal is None or '\x00' in literal: + raise RuntimeError(f'Invalid literal "{literal}"') + + # SQL:1999 character string literal + return '\'' + literal.replace('\'', '\'\'') + '\'' + ###### # The function definitions below are taken from cpython's source code @@ -108,6 +108,8 @@ dataset: open-options-publish: USER: webmap_guest + sourcecache-layername: public.sourcecache + # Optional dictionary of default layer creation options, cf. # https://gdal.org/drivers/vector/pg.html#layer-creation-options or # https://gdal.org/drivers/vector/gpkg.html#layer-creation-options diff --git a/import_source.py b/import_source.py index e30a245..1fa754c 100644 --- a/import_source.py +++ b/import_source.py @@ -37,7 +37,7 @@ from osgeo.gdalconst import ( ) from osgeo import gdalconst -from common import BadConfiguration, escape_identifier +from common import BadConfiguration, escape_identifier, escape_literal_str from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, formatTZFlag def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset: @@ -431,18 +431,81 @@ def listArchiveMembers(namelist : list[str], logging.debug('Ignoring archive member %s', name) return members +def getSourceCacheKey(source : dict[str,Any]) -> tuple[str,str,str]: + """Get the unique triplet (source_path, archive_member, layername) + for the layer source which can be used for attribute filters and + lookups.""" + importdef = source['import'] + sourcedef = source['source'] + source_path = sourcedef['path'] + archive_member = importdef['path'] if sourcedef.get('unar', None) is not None else '' + layername = importdef.get('layername', '') + return (source_path, archive_member, layername) + # pylint: disable-next=dangerous-default-value def importSources(lyr_dst : ogr.Layer, + dso : gdal.Dataset, sources : dict[str,Any] = {}, cachedir : Path|None = None, - extent : ogr.Geometry|None = None) -> None: - """Import source layers""" + mtimes : dict[str, int|None]|None = None, + lyr_sourcecache : ogr.Layer|None = None, + extent : ogr.Geometry|None = None) -> bool: + """Import source layers.""" + bChanged = False for source in sources: _importSource(lyr_dst, **source['source'], args=source['import'], cachedir=cachedir, extent=extent) + # force the PG driver to call EndCopy() to detect errors and trigger a rollback if needed + # TODO [3.9] use lyr_dst.GetDataset().FlushCache() instead + dso.FlushCache() + + if lyr_sourcecache is None: + bChanged = True + continue + + source_path, archive_member, layername = getSourceCacheKey(source) + attributeFilter = ('(source_path,archive_member,layername) = (' + + escape_literal_str(source_path) + ',' + + escape_literal_str(archive_member) + ',' + + escape_literal_str(layername) + ')') + lyr_sourcecache.SetAttributeFilter(attributeFilter) + + feature = lyr_sourcecache.GetNextFeature() + if feature is not None: + logging.debug('Updating existing feature in source cache for %s', attributeFilter) + update = True + assert lyr_sourcecache.GetNextFeature() is None + else: + logging.debug('Creating new feature in source cache for %s', attributeFilter) + update = False + feature = ogr.Feature(lyr_sourcecache.GetLayerDefn()) + feature.SetFieldString(0, source_path) + feature.SetFieldString(1, archive_member) + feature.SetFieldString(2, layername) + + mtime_ns = mtimes[source_path] + if mtime_ns is None: + feature.SetFieldNull(3) + else: + feature.SetFieldInteger64(3, mtime_ns) + + if update: + # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead + if lyr_sourcecache.SetFeature(feature) != ogr.OGRERR_NONE: + raise RuntimeError('Could not update feature in sourcecache') + else: + if lyr_sourcecache.CreateFeature(feature) != ogr.OGRERR_NONE: + raise RuntimeError('Could not create new feature in sourcecache') + + # TODO [3.9] use lyr_sourcecache.GetDataset().FlushCache() instead + dso.FlushCache() + bChanged = True # TODO fingerprint the features to detect changes + + return bChanged + # pylint: disable-next=dangerous-default-value def _importSource(lyr : ogr.Layer, path : str = '/nonexistent', @@ -2,8 +2,8 @@ -- PostgreSQL database dump -- --- Dumped from database version 15.8 (Debian 15.8-0+deb12u1) --- Dumped by pg_dump version 15.8 (Debian 15.8-0+deb12u1) +-- Dumped from database version 15.12 (Debian 15.12-0+deb12u2) +-- Dumped by pg_dump version 15.12 (Debian 15.12-0+deb12u2) SET statement_timeout = 0; SET lock_timeout = 0; @@ -2756,6 +2756,49 @@ ALTER SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" OWNED BY postgis."vbk:vin -- +-- Name: sourcecache; Type: TABLE; Schema: public; Owner: webmap_import +-- + +CREATE TABLE public.sourcecache ( + ogc_fid bigint NOT NULL, + source_path character varying(2047) NOT NULL, + archive_member character varying(2047) NOT NULL, + layername character varying(255) NOT NULL, + mtime_ns bigint +); + + +ALTER TABLE public.sourcecache OWNER TO webmap_import; + +-- +-- Name: TABLE sourcecache; Type: COMMENT; Schema: public; Owner: webmap_import +-- + +COMMENT ON TABLE public.sourcecache IS 'Metadata cache for sources files'; + + +-- +-- Name: sourcecache_ogc_fid_seq; Type: SEQUENCE; Schema: public; Owner: webmap_import +-- + +CREATE SEQUENCE public.sourcecache_ogc_fid_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +ALTER TABLE public.sourcecache_ogc_fid_seq OWNER TO webmap_import; + +-- +-- Name: sourcecache_ogc_fid_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: webmap_import +-- + +ALTER SEQUENCE public.sourcecache_ogc_fid_seq OWNED BY public.sourcecache.ogc_fid; + + +-- -- Name: dammar ogc_fid; Type: DEFAULT; Schema: postgis; Owner: webmap_import -- @@ -3127,6 +3170,13 @@ ALTER TABLE ONLY postgis."vbk:vindkraftverk" ALTER COLUMN ogc_fid SET DEFAULT ne -- +-- Name: sourcecache ogc_fid; Type: DEFAULT; Schema: public; Owner: webmap_import +-- + +ALTER TABLE ONLY public.sourcecache ALTER COLUMN ogc_fid SET DEFAULT nextval('public.sourcecache_ogc_fid_seq'::regclass); + + +-- -- Name: dammar dammar_DammID_key; Type: CONSTRAINT; Schema: postgis; Owner: webmap_import -- @@ -4007,6 +4057,22 @@ ALTER TABLE ONLY postgis."vbk:vindkraftverk" -- +-- Name: sourcecache sourcecache_pkey; Type: CONSTRAINT; Schema: public; Owner: webmap_import +-- + +ALTER TABLE ONLY public.sourcecache + ADD CONSTRAINT sourcecache_pkey PRIMARY KEY (ogc_fid); + + +-- +-- Name: sourcecache sourcecache_source_path_archive_member_layername_key; Type: CONSTRAINT; Schema: public; Owner: webmap_import +-- + +ALTER TABLE ONLY public.sourcecache + ADD CONSTRAINT sourcecache_source_path_archive_member_layername_key UNIQUE (source_path, archive_member, layername); + + +-- -- Name: dammar_wkb_geometry_geom_idx; Type: INDEX; Schema: postgis; Owner: webmap_import -- @@ -5157,6 +5223,20 @@ GRANT SELECT,USAGE ON SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" TO webmap -- +-- Name: TABLE sourcecache; Type: ACL; Schema: public; Owner: webmap_import +-- + +GRANT SELECT ON TABLE public.sourcecache TO webmap_guest; + + +-- +-- Name: SEQUENCE sourcecache_ogc_fid_seq; Type: ACL; Schema: public; Owner: webmap_import +-- + +GRANT SELECT,USAGE ON SEQUENCE public.sourcecache_ogc_fid_seq TO webmap_guest; + + +-- -- PostgreSQL database dump complete -- diff --git a/webmap-download b/webmap-download index edb624e..05aa2c4 100755 --- a/webmap-download +++ b/webmap-download @@ -44,12 +44,11 @@ import argparse import itertools from pathlib import Path from email.utils import parsedate_to_datetime, formatdate -from hashlib import sha256 from typing import Optional, NoReturn, Never import requests import common -from common import BadConfiguration +from common import BadConfiguration, getSourcePathLockFileName def download_trystream(url : str, **kwargs) -> requests.Response: """GET a url, trying a number of times. Return immediately after the @@ -309,9 +308,9 @@ def main() -> NoReturn: # place an exclusive lock on a lockfile as the destination can be used by other layers # hence might be updated in parallel if lockdir_fd is not None: - lockfile = sha256(dest.encode('utf-8')).hexdigest() + '.lck' + lockfile = getSourcePathLockFileName(dest) # use O_TRUNC to bump lockfile's mtime - lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644, + lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o664, dir_fd=lockdir_fd) try: if lockdir_fd is not None: diff --git a/webmap-import b/webmap-import index 1171851..5cb76d3 100755 --- a/webmap-import +++ b/webmap-import @@ -20,13 +20,15 @@ # pylint: disable=invalid-name, missing-module-docstring, fixme -from os import O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC +from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY import os +from stat import S_ISREG import sys -from fcntl import flock, LOCK_EX +from fcntl import flock, LOCK_EX, LOCK_SH import logging import argparse import re +from datetime import datetime from pathlib import Path from typing import Any, Optional, NoReturn import traceback @@ -41,7 +43,12 @@ from osgeo.gdalconst import ( from osgeo import gdalconst import common -from common import BadConfiguration, escape_identifier +from common import ( + BadConfiguration, + escape_identifier, + escape_literal_str, + getSourcePathLockFileName +) from common_gdal import ( gdalVersionMin, gdalGetMetadataItem, @@ -57,7 +64,8 @@ from import_source import ( createOutputLayer, validateOutputLayer, clearLayer, - importSources + importSources, + getSourceCacheKey ) def setFieldIf(cond : bool, @@ -276,6 +284,258 @@ def validate_sources(layers : dict[str, Any]) -> None: for layername in toremove: layers.pop(layername) +def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int, + name : str, + typ : int, + subtyp : int = ogr.OFSTNone, + width : int = 0, + isNullable : Optional[bool] = None) -> bool: + """Validate field #idx from the source cache layer/table.""" + n = defn.GetFieldCount() + if idx >= n: + return False + defn = defn.GetFieldDefn(idx) + + b = True + name2 = defn.GetName() + if name2 != name: + logging.warning('Source cache layer\'s field #%d has name "%s" != "%s", ignoring cache', + idx, name2, name) + b = False + + if isNullable is not None and defn.IsNullable() != isNullable: + # non-fatal + logging.warning('Source cache layer\'s field #%d ("%s") %s nullable', + idx, name2, 'is' if defn.IsNullable() else 'is not') + + typ2 = defn.GetType() + if typ2 != typ: + logging.warning('Source cache layer\'s field #%d ("%s") has type %s != %s, ignoring cache', + idx, name2, + ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ)) + b = False + + subtyp2 = defn.GetSubType() + if subtyp2 != subtyp: + logging.warning('Source cache layer\'s field #%d ("%s") has subtype %s != %s, ' + 'ignoring cache', idx, name2, + ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp)) + b = False + + width2 = defn.GetWidth() + if width2 != 0 and (width == 0 or width2 < width): + # non-fatal + logging.warning('Source cache layer\'s field #%d ("%s") is too small (width %d < %d)', + idx, name2, width2, width) + return b + +def getSourceCacheLayer(ds : gdal.Dataset, name : str|None, + force : bool = False) -> ogr.Layer|None: + """Get the source cache layer/table and validate it.""" + if name is None: + return None + + lyr = ds.GetLayerByName(name) + if lyr is None: + if not force: # show a warning if args.force is not set + logging.warning('Table "%s" does not exist, implying --force', name) + return None + +# if not (lyr.TestCapability(ogr.OLCRandomWrite) and +# gdalVersionMin(maj=3, min=9) and +# lyr.TestCapability(ogr.OLCUpdateFeature)): +# # TODO OLCUpdateFeature was added in 3.7 but we also want to use .GetDataset() +# # which was added in 3.9 +# logging.warning('Layer "%s" does not support OLCUpdateFeature capability, ' +# 'ignoring cache', name) +# return None + + defn = lyr.GetLayerDefn() + fields = [ + { 'name': 'source_path', 'typ': ogr.OFTString, + 'isNullable': False, 'width': 2047 }, + { 'name': 'archive_member', 'typ': ogr.OFTString, + 'isNullable': False, 'width': 2047 }, + { 'name': 'layername', 'typ': ogr.OFTString, + 'isNullable': False, 'width': 255 }, + { 'name': 'mtime_ns', 'typ': ogr.OFTInteger64, + 'isNullable': True }, + ] + m = len(fields) + n = defn.GetFieldCount() + if n < m: + logging.warning('Source cache layer/table "%s" has %d < %d fields, ignoring cache', + name, n, m) + elif n != m: + logging.warning('Source cache layer/table "%s" has %d != %d fields', name, n, m) + if not all(validateSourceCacheField(defn, i, **fld) for i,fld in enumerate(fields)): + return None + + n = defn.GetGeomFieldCount() + if n > 0: + geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName()) + for i in range(n) ] + logging.warning('Source cache layer/table "%s" has %d > 0 geometry field(s): %s', + name, n, ', '.join(geomFieldNames)) + + if gdalVersionMin(maj=3, min=5): + style = lyr.GetStyleTable() + if style is not None: + logging.warning('Source cache layer/table "%s" has a style table "%s"', + name, style.GetLastStyleName()) + return lyr + +def getSourcesMtimeNS(sources : dict[str,Any], + cachedir : Optional[Path] = None) -> dict[str,int|None]: + """Return a dictionary mapping each source path to its last modification + time (in nanoseconds), or None if said source path is not a regular file.""" + mtimes_ns = {} + for source in sources: + # the same source_path can be used multiple times, stat(2) only once + source_path = source['source']['path'] + mtimes_ns[source_path] = None + for source_path in mtimes_ns: + path = source_path if cachedir is None else str(cachedir.joinpath(source_path)) + try: + st = os.stat(path) + if not S_ISREG(st.st_mode): + raise FileNotFoundError + mtimes_ns[source_path] = st.st_mtime_ns + except (OSError, ValueError): + #logging.warning('Could not stat(%s)', path) + pass + return mtimes_ns + +def isSourceCacheDirtyTime(lyr : ogr.Layer, sources : dict[str,Any], + mtimes_ns : dict[str,int|None]) -> bool: + """Return a boolean indicating whether the source cache layer/table is up to + date with respect to the dictionary mapping each source path to its last + modification time. That is, every triplet (source_path, archive_member, + layername) needs to be present in cache, and the corresponding mtime_ns + needs to match the stat(2) output of the source file on disk.""" + ks = set() + for source in sources: + source_path, archive_member, layername = k = getSourceCacheKey(source) + if k in ks: + raise BadConfiguration(f'Duplicate key {k}') + ks.add(k) + + if len(ks) == 0: + return False + + attributeFilter = [] + for source_path, archive_member, layername in ks: + attributeFilter.append('(' + escape_literal_str(source_path) + ',' + + escape_literal_str(archive_member) + ',' + + escape_literal_str(layername) + ')') + if len(attributeFilter) == 1: + attributeFilter = '= ' + attributeFilter[0] + else: + attributeFilter = 'IN (' + ','.join(attributeFilter) + ')' + attributeFilter = '(source_path,archive_member,layername) ' + attributeFilter + logging.debug('SetAttributeFilter("%s", "%s")', lyr.GetName(), attributeFilter) + lyr.SetAttributeFilter(attributeFilter) + + cache = {} + feature = lyr.GetNextFeature() + while feature is not None: + k = ( + feature.GetFieldAsString(0) if feature.IsFieldSetAndNotNull(0) else None, + feature.GetFieldAsString(1) if feature.IsFieldSetAndNotNull(1) else None, + feature.GetFieldAsString(2) if feature.IsFieldSetAndNotNull(2) else None, + ) + if any(k0 is None for k0 in k): + # no column in the the key may be NULL + raise RuntimeError(f'Bad key {k}') + if k in cache: + raise RuntimeError(f'Duplicate key {k}') + cache[k] = feature.GetFieldAsInteger64(3) if feature.IsFieldSetAndNotNull(3) else None + feature = lyr.GetNextFeature() + + for k in ks: + a = cache.get(k, None) + b = mtimes_ns[k[0]] + if a is None or b is None or a != b: + return True + + for source_path in sorted({ p for p,_,_ in ks }): + # XXX datetime.fromtimestamp() doesn't support nanosecond input + # https://github.com/python/cpython/issues/59648 + mtime = (mtimes_ns[source_path] // 1000)/1000000. + dt = datetime.fromtimestamp(mtime) + logging.info('Source file %s is unchanged (last modification time %s)', + source_path, dt.astimezone().isoformat(timespec='seconds')) + return False + +def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]: + """Place shared locks on each source path and return their respective file + descriptors. We could do that one layerdef at a time (one output layer at a + time) to reduce the time during which the sources prevented from being + updated/downloaded, but their is some value in having consistency across the + whole import process.""" + lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY) + try: + ret = {} + for layerdef in layerdefs: + for source in layerdef['sources']: + source_path = source['source']['path'] + lockfile = getSourcePathLockFileName(source_path) + lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664, + dir_fd=lockdir_fd) + logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)', + source_path, lockfile) + flock(lock_fd, LOCK_SH) + ret[source_path] = lock_fd + return ret + finally: + try: + os.close(lockdir_fd) + except (OSError, ValueError): + logging.exception('Could not close lockdir') + +def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None: + """Release shared locks on the source paths. Closed FDs are removed from + the dictionary in place.""" + toremove = set() + for path, lock_fd in lock_fds.items(): + try: + os.close(lock_fd) + except (OSError, ValueError): + logging.exception('Could not release lock for %s', path) + else: + logging.debug('Released lock for %s', path) + toremove.add(path) + for path in toremove: + lock_fds.pop(path) + +def commitOrRollbackTransactionIf(obj : gdal.Dataset|ogr.Layer, commit : bool) -> bool: + """Either commit or rollback the transaction, depending on a condition + indicating whether there are pending new changes to be committed.""" + if commit: + logging.debug('Committing transaction') + if obj.CommitTransaction() == ogr.OGRERR_NONE: + return True + logging.error('Could not commit transaction') + else: + logging.info('No changes detected, rolling back transaction') + # don't call rollbackTransactionTry() as we don't want to catch exceptions + if obj.RollbackTransaction() == ogr.OGRERR_NONE: + return True + logging.error('Could not rollback transaction') + return False + +def rollbackTransactionTry(obj : gdal.Dataset|ogr.Layer) -> bool: + """Try to rollback the current transaction on the dataset or layer, ignoring + any exception but logging errors and returning a boolean indicating + success.""" + try: + if obj.RollbackTransaction() == ogr.OGRERR_NONE: + return True + logging.error('Could not rollback transaction') + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not rollback transaction') + return False + # pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements def main() -> NoReturn: common.init_logger(app=os.path.basename(__file__), level=logging.INFO) @@ -286,7 +546,11 @@ def main() -> NoReturn: parser.add_argument('--debug', action='count', default=0, help=argparse.SUPPRESS) parser.add_argument('--lockfile', default=None, - help='obtain an exclusive lock before starting unpacking and importing') + help='obtain an exclusive lock before processing') + parser.add_argument('--lockdir-sources', default=None, + help='optional directory for lock files to sources paths') + parser.add_argument('--force', default=False, action='store_true', + help='import even if no new changes were detected') parser.add_argument('groupname', nargs='*', help='group layer name(s) to process') args = parser.parse_args() @@ -313,8 +577,8 @@ def main() -> NoReturn: dso = openOutputDS(config['dataset']) validate_schema(layers, - drvo=dso.GetDriver(), - lco_defaults=config['dataset'].get('create-layer-options', None)) + drvo=dso.GetDriver(), + lco_defaults=config['dataset'].get('create-layer-options', None)) # get configured Spatial Reference System and extent srs = getSRS(config.get('SRS', None)) @@ -340,6 +604,11 @@ def main() -> NoReturn: createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None)) cachedir = Path(args.cachedir) if args.cachedir is not None else None + if args.lockdir_sources is None: + sourcePathLocks = None + else: + sourcePathLocks = lockSourcePaths(layerdefs=layers.values(), + lockdir=args.lockdir_sources) if (dso.TestCapability(ogr.ODsCTransactions) and dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')): @@ -350,6 +619,12 @@ def main() -> NoReturn: dso.GetDriver().ShortName) dsoTransaction = False + # get source cache layer/table + lyr_sourcecache = getSourceCacheLayer(dso, + config['dataset'].get('sourcecache-layername', None), + force=args.force) + + bChanged = False rv = 0 try: for layername, layerdef in layers.items(): @@ -360,9 +635,18 @@ def main() -> NoReturn: if not lyr.TestCapability(ogr.OLCSequentialWrite): raise RuntimeError(f'Output layer "{layername}" has no working ' 'CreateFeature() method') - validateOutputLayer(lyr, srs=srs, options=layerdef['create']) sources = layerdef['sources'] + if lyr_sourcecache is None: + sourcesMtimeNS = None + else: + sourcesMtimeNS = getSourcesMtimeNS(sources, cachedir=cachedir) + if not (args.force or isSourceCacheDirtyTime(lyr_sourcecache, sources, + mtimes_ns=sourcesMtimeNS)): + logging.info('Output layer "%s" is up to date, skipping', layername) + continue + + validateOutputLayer(lyr, srs=srs, options=layerdef['create']) # setup output field mapping in the sources dictionary setOutputFieldMap(lyr.GetLayerDefn(), sources) @@ -381,36 +665,44 @@ def main() -> NoReturn: lyrTransaction = False try: - clearLayer(dso, lyr) + clearLayer(dso, lyr) # TODO conditional (only if not new)? description = layerdef.get('description', None) if (description is not None and lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): logging.warning('Could not set description metadata') - importSources(lyr, sources=sources, cachedir=cachedir, extent=extent) + bChanged0 = importSources(lyr_dst=lyr, dso=dso, sources=sources, + mtimes=sourcesMtimeNS, + lyr_sourcecache=lyr_sourcecache, + cachedir=cachedir, extent=extent) + bChanged = bChanged or bChanged0 - if isinstance(lyrTransaction, bool) and lyrTransaction: - # commit transaction - logging.debug('Committing transaction') + if isinstance(lyrTransaction, str): + if not (bChanged0 or args.force): + logging.info('No changes detected, rolling back to previous savepoint') + query = 'ROLLBACK TO ' + lyrTransaction + logging.debug(query) + try: + dso.ExecuteSQL(query) + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not %s', query) + rv = 1 + elif isinstance(lyrTransaction, bool) and lyrTransaction: lyrTransaction = False - if lyr.CommitTransaction() != ogr.OGRERR_NONE: - logging.error('Could not commit transaction') + if not commitOrRollbackTransactionIf(lyr, commit=bChanged0 or args.force): rv = 1 except Exception: # pylint: disable=broad-exception-caught if isinstance(lyrTransaction, str): query = 'ROLLBACK TO ' + lyrTransaction - logging.exception('Exception occured in transaction, %s', query) + logging.exception('Exception occured within transaction, ' + 'rolling back to previous savepoint') logging.debug(query) dso.ExecuteSQL(query) elif isinstance(lyrTransaction, bool) and lyrTransaction: - logging.exception('Exception occured in transaction, rolling back') - try: - if lyr.RollbackTransaction() != ogr.OGRERR_NONE: - logging.error('Could not rollback transaction') - except RuntimeError: - logging.exception('Could not rollback transaction') + logging.exception('Exception occured within transaction, rolling back') + rollbackTransactionTry(lyr) else: traceback.print_exc() rv = 1 @@ -422,29 +714,28 @@ def main() -> NoReturn: logging.debug(query) dso.ExecuteSQL(query) + if sourcePathLocks is not None: + releaseSourcePathLocks(sourcePathLocks) + if dsoTransaction: # commit transaction - logging.debug('Committing transaction') dsoTransaction = False - if dso.CommitTransaction() != ogr.OGRERR_NONE: - logging.error('Could not commit transaction') + if not commitOrRollbackTransactionIf(dso, commit=bChanged or args.force): rv = 1 except Exception: # pylint: disable=broad-exception-caught if dsoTransaction: - logging.exception('Exception occured in transaction, rolling back') - try: - if dso.RollbackTransaction() != ogr.OGRERR_NONE: - logging.error('Could not rollback transaction') - except RuntimeError: - logging.exception('Could not rollback transaction') + logging.exception('Exception occured within transaction, rolling back') + rollbackTransactionTry(lyr) else: traceback.print_exc() - rv = 1 + sys.exit(1) - dso = None - srs = None - extent = None + finally: + lyr_sourcecache = None + dso = None + srs = None + extent = None sys.exit(rv) gdal.UseExceptions() |