#!/usr/bin/python3 #---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (extract/import layers) # Copyright © 2024-2025 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- # pylint: disable=invalid-name, missing-module-docstring, fixme from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY import os from stat import S_ISREG import sys from fcntl import flock, LOCK_EX, LOCK_SH import logging import argparse import re from datetime import datetime, timedelta, timezone, UTC from math import modf from pathlib import Path from time import monotonic as time_monotonic from typing import Any, Optional, NoReturn import traceback from osgeo import gdal, ogr, osr from osgeo.gdalconst import ( CE_None as GDAL_CE_None, DCAP_DEFAULT_FIELDS as GDAL_DCAP_DEFAULT_FIELDS, DCAP_NOTNULL_FIELDS as GDAL_DCAP_NOTNULL_FIELDS, DCAP_UNIQUE_FIELDS as GDAL_DCAP_UNIQUE_FIELDS, ) from osgeo import gdalconst import common from common import ( BadConfiguration, escape_identifier, escape_literal_str, getSourcePathLockFileName ) from common_gdal import ( gdalVersionMin, gdalGetMetadataItem, getSRS, getExtent, parseGeomType, parseFieldType, parseSubFieldType, parseTimeZone ) from import_source import ( openOutputDS, createOutputLayer, validateOutputLayer, importSources, ImportStatus ) from export_mvt import exportMVT def setFieldIf(cond : bool, attrName : str, val : Any, data : dict[str, Any], fldName : str, drvName : str, log = logging.warning) -> None: """Conditionally set a field""" if cond: data[attrName] = val else: if isinstance(val, str): val2 = '"' + val + '"' else: val2 = str(val) log('Ignoring %s=%s on field "%s" (not supported by %s driver)', attrName, val2, fldName, drvName) # pylint: disable-next=too-many-branches def validate_schema(layers : dict[str, Any], drvo : Optional[gdal.Driver] = None, lco_defaults : Optional[dict[str, str]] = None) -> None: """Validate layer creation options and schema. The schema is modified in place with the parsed result. (We need the driver of the output dataset to determine capability on constraints.)""" # Cf. https://github.com/OSGeo/gdal/blob/master/NEWS.md if gdalVersionMin(maj=3, min=7): # list of capability flags supported by the CreateField() API drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_CREATION_FIELD_DEFN_FLAGS) drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else [] drvoSupportsFieldComment = 'Comment' in drvoFieldDefnFlags # GetTZFlag()/SetTZFlag() and OGR_TZFLAG_* constants added in 3.8.0 hasTZFlagSupport = gdalVersionMin(maj=3, min=8) else: # list of flags supported by the OGRLayer::AlterFieldDefn() API drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_ALTER_FIELD_DEFN_FLAGS) drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else [] # GetComment()/SetComment() added in 3.7.0 drvoSupportsFieldComment = False hasTZFlagSupport = False # cache driver capabilities drvoSupportsFieldWidthPrecision = 'WidthPrecision' in drvoFieldDefnFlags drvoSupportsFieldNullable = ('Nullable' in drvoFieldDefnFlags and gdalGetMetadataItem(drvo, GDAL_DCAP_NOTNULL_FIELDS)) drvoSupportsFieldUnique = ('Unique' in drvoFieldDefnFlags and gdalGetMetadataItem(drvo, GDAL_DCAP_UNIQUE_FIELDS)) drvoSupportsFieldDefault = ('Default' in drvoFieldDefnFlags and gdalGetMetadataItem(drvo, GDAL_DCAP_DEFAULT_FIELDS)) drvoSupportsFieldAlternativeName = 'AlternativeName' in drvoFieldDefnFlags for layername, layerdef in layers.items(): create = layerdef.get('create', None) if create is None or len(create) < 1: logging.warning('Layer "%s" has no creation schema', layername) continue # prepend global layer creation options (dataset:create-layer-options) # and build the option=value list lco = create.get('options', None) if lco_defaults is not None or lco is not None: options = [] if lco_defaults is not None: options += [ k + '=' + str(v) for k, v in lco_defaults.items() ] if lco is not None: options += [ k + '=' + str(v) for k, v in lco.items() ] create['options'] = options # parse geometry type create['geometry-type'] = parseGeomType(create.get('geometry-type', None)) fields = create.get('fields', None) if fields is None: create['fields'] = [] else: fields_set = set() for idx, fld_def in enumerate(fields): fld_name = fld_def.get('name', None) if fld_name is None or fld_name == '': raise BadConfiguration(f'Field #{idx} has no name') if fld_name in fields_set: raise BadConfiguration(f'Duplicate field "{fld_name}"') fields_set.add(fld_name) fld_def2 = { 'Name': fld_name } for k, v in fld_def.items(): k2 = k.lower() if k2 == 'name': pass elif k2 in ('alternativename', 'alias'): setFieldIf(drvoSupportsFieldAlternativeName, 'AlternativeName', v, fld_def2, fld_name, drvo.ShortName, log=logging.debug) elif k2 == 'comment': setFieldIf(drvoSupportsFieldComment, 'Comment', v, fld_def2, fld_name, drvo.ShortName, log=logging.debug) elif k2 == 'type': fld_def2['Type'] = parseFieldType(v) elif k2 == 'subtype': fld_def2['SubType'] = parseSubFieldType(v) elif k2 == 'tz': if hasTZFlagSupport: fld_def2['TZFlag'] = parseTimeZone(v) else: logging.debug('Ignoring TZ="%s" on field "%s" (OGR v%s is too old)', v, fld_name, gdal.__version__) elif k2 == 'width' and v is not None and isinstance(v, int): setFieldIf(drvoSupportsFieldWidthPrecision, 'Width', v, fld_def2, fld_name, drvo.ShortName) elif k2 == 'precision' and v is not None and isinstance(v, int): setFieldIf(drvoSupportsFieldWidthPrecision, 'Precision', v, fld_def2, fld_name, drvo.ShortName) # constraints elif k2 == 'default': setFieldIf(drvoSupportsFieldDefault, 'Default', v, fld_def2, fld_name, drvo.ShortName) elif k2 == 'nullable' and v is not None and isinstance(v, bool): setFieldIf(drvoSupportsFieldNullable, 'Nullable', v, fld_def2, fld_name, drvo.ShortName) elif k2 == 'unique' and v is not None and isinstance(v, bool): setFieldIf(drvoSupportsFieldUnique, 'Unique', v, fld_def2, fld_name, drvo.ShortName) else: raise BadConfiguration(f'Field "{fld_name}" has unknown key "{k}"') fields[idx] = fld_def2 def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]): """Setup output field mapping, modifying the sources dictionary in place.""" fieldMap = {} n = defn.GetFieldCount() for i in range(n): fld = defn.GetFieldDefn(i) fldName = fld.GetName() fieldMap[fldName] = i for source in sources: source_import = source['import'] fieldMap2 = source_import.get('field-map', None) if fieldMap2 is None: fieldMap2 = fieldMap else: if isinstance(fieldMap2, list): # convert list to identity dictionary fieldMap2 = { fld: fld for fld in fieldMap2 } for ifld, ofld in fieldMap2.items(): i = fieldMap.get(ofld, None) if i is None: raise RuntimeError(f'Ouput layer has no field named "{ofld}"') fieldMap2[ifld] = i source_import['field-map'] = fieldMap2 # validate field value mapping valueMap = source_import.get('value-map', None) if valueMap is not None: for fldName, rules in valueMap.items(): if rules is None: continue if not isinstance(rules, list): rules = [rules] for idx, rule in enumerate(rules): if rule is None or not isinstance(rule, dict): raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}') if 'type' not in rule: ruleType = rule['type'] = 'literal' else: ruleType = rule['type'] if ('replace' not in rule or 'with' not in rule or len(rule) != 3 or ruleType is None or ruleType not in ('literal', 'regex')): raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}') if ruleType == 'regex': rule['replace'] = re.compile(rule['replace']) rules[idx] = ( rule['replace'], rule['with'] ) def processOutputLayer(ds : gdal.Dataset, layername : str, layerdef : dict[str,Any], srs : Optional[osr.SpatialReference] = None, cachedir : Path|None = None, extent : ogr.Geometry|None = None, dsTransaction : bool = False, lyrcache : ogr.Layer|None = None, force : bool = False) -> ImportStatus: """Process an output layer.""" logging.info('Processing output layer "%s"', layername) lyr = ds.GetLayerByName(layername) if lyr is None: raise RuntimeError(f'Failed to create output layer "{layername}"??') if not lyr.TestCapability(ogr.OLCSequentialWrite): raise RuntimeError(f'Output layer "{layername}" has no working ' 'CreateFeature() method') sources = layerdef['sources'] if not (lyrcache is None or force or areSourceFilesNewer(layername, sources=sources, lyrcache=lyrcache, cachedir=cachedir)): logging.info('Output layer "%s" is up to date, skipping', layername) return ImportStatus.IMPORT_NOCHANGE validateOutputLayer(lyr, srs=srs, options=layerdef['create']) description = layerdef.get('description', None) if (description is not None and lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): logging.warning('Could not set description metadata') # setup output field mapping in the sources dictionary setOutputFieldMap(lyr.GetLayerDefn(), sources) return importSources(dso=ds, lyr=lyr, sources=sources, cachedir=cachedir, extent=extent, dsoTransaction=dsTransaction, lyrcache=lyrcache, force=force) def validate_sources(layers : dict[str, Any]) -> None: """Mangle and validate layer sources and import definitions""" toremove = set() for layername, layerdefs in layers.items(): sources = layerdefs.get('sources', None) if sources is None or len(sources) < 1: logging.warning('Output layer "%s" has no definition, skipping', layername) toremove.add(layername) continue for idx, layerdef in enumerate(sources): importdef = layerdef.get('import', None) if importdef is None: raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' 'has no import definition') sourcedef = layerdef.get('source', None) unar = None if sourcedef is None else sourcedef.get('unar', None) src = None if sourcedef is None else sourcedef.get('path', None) ds_srcpath = importdef.get('path', None) if src is None and unar is None and ds_srcpath is not None: # fallback to importe:path if there is no unarchiving recipe src = ds_srcpath if unar is not None and ds_srcpath is None: raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' 'has no import source path') if src is None: raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' 'has no source path') layerdef['source'] = { 'path': src, 'unar': unar } for layername in toremove: layers.pop(layername) def validateLayerCacheField(defn : ogr.FeatureDefn, idx : int, name : str, typ : int, subtyp : int = ogr.OFSTNone, width : int = 0, unique : Optional[bool] = None, nullable : Optional[bool] = None) -> bool: """Validate field #idx from the layer cache table.""" n = defn.GetFieldCount() if idx >= n: return False defn = defn.GetFieldDefn(idx) b = True name2 = defn.GetName() if name2 != name: logging.warning('Layer cache\'s field #%d has name "%s" != "%s"', idx, name2, name) b = False if nullable is not None and defn.IsNullable() != nullable: # non-fatal logging.warning('Layer cache\'s field #%d ("%s") %s nullable', idx, name2, 'is' if defn.IsNullable() else 'isn\'t') if unique is not None and defn.IsUnique() != unique: # non-fatal logging.warning('Layer cache\'s field #%d ("%s") %s unique', idx, name2, 'is' if defn.IsUnique() else 'isn\'t') typ2 = defn.GetType() if typ2 != typ: logging.warning('Layer cache\'s field #%d ("%s") has type %s != %s', idx, name2, ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ)) b = False subtyp2 = defn.GetSubType() if subtyp2 != subtyp: logging.warning('Layer cache\'s field #%d ("%s") has subtype %s != %s', idx, name2, ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp)) b = False width2 = defn.GetWidth() if width2 != 0 and (width == 0 or width2 < width): # non-fatal logging.warning('Layer cache\'s field #%d ("%s") is too small (width %d < %d)', idx, name2, width2, width) return b def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool: """Validate layer cache table.""" drvName = ds.GetDriver().ShortName if drvName != 'PostgreSQL': # we need hash_record_extended(), sha256() and ST_AsEWKB() logging.warning('Unsupported cache layer for output driver %s', drvName) return False lyr = ds.GetLayerByName(name) if lyr is None: logging.warning('Table "%s" does not exist', name) return False # if not (lyr.TestCapability(ogr.OLCRandomWrite) and # gdalVersionMin(maj=3, min=7) and # lyr.TestCapability(ogr.OLCUpdateFeature)): # logging.warning('Layer "%s" does not support OLCUpdateFeature capability, ' # 'ignoring cache', name) # return None defn = lyr.GetLayerDefn() fields = [ { 'name': 'layername', 'typ': ogr.OFTString, 'nullable': False, 'unique': True, 'width': 255 }, { 'name': 'last_updated', 'typ': ogr.OFTDateTime, 'nullable': False }, { 'name': 'fingerprint', 'typ': ogr.OFTBinary, 'nullable': False, 'width': 32 }, ] m = len(fields) n = defn.GetFieldCount() if n < m: # this is fatal, and `all(bs)` is False so we return False below logging.warning('Layer cache "%s" has %d < %d fields', name, n, m) elif n != m: logging.warning('Layer cache "%s" has %d != %d fields', name, n, m) bs = [ validateLayerCacheField(defn, i, **fld) for i,fld in enumerate(fields) ] if not all(bs): return False n = defn.GetGeomFieldCount() if n > 0: geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName()) for i in range(n) ] logging.warning('Layer cache "%s" has %d > 0 geometry field(s): %s', name, n, ', '.join(geomFieldNames)) if gdalVersionMin(maj=3, min=5): style = lyr.GetStyleTable() if style is not None: logging.warning('Layer cache "%s" has a style table "%s"', name, style.GetLastStyleName()) return True def areSourceFilesNewer(layername : str, sources : dict[str,Any], lyrcache : ogr.Layer, cachedir : Optional[Path] = None) -> bool: """Return a boolean indicating whether the layer cache is up to date with respect to the source files found on disk. That is, the last modification and last changed time of each source file needs to be equal or lower than the `last_updated` value found in the layer cache.""" source_paths = set() for source in sources: # the same source_path can be used multiple times, stat(2) only once source_path = source['source']['path'] source_paths.add(source_path) if len(source_paths) == 0: return False t = None mtimes_ns = {} for source_path in source_paths: path = source_path if cachedir is None else str(cachedir.joinpath(source_path)) try: st = os.stat(path) if not S_ISREG(st.st_mode): raise FileNotFoundError mtimes_ns[source_path] = st.st_mtime_ns # take max(mtime, ctime): if we lock source paths any update after # aquiring the lock will yield a value larger than time.time_ns() t2 = max(st.st_mtime_ns, st.st_ctime_ns) if t is None or t < t2: t = t2 except (OSError, ValueError): #logging.warning('Could not stat(%s)', path) return True assert t is not None attributeFilter = 'layername = ' + escape_literal_str(layername) logging.debug('SetAttributeFilter("%s", "%s")', lyrcache.GetName(), attributeFilter) lyrcache.SetAttributeFilter(attributeFilter) feature = lyrcache.GetNextFeature() if feature is None: # not in cache return True if not feature.IsFieldSetAndNotNull(1): ret = True else: # https://gdal.org/en/stable/api/python/vector_api.html#osgeo.ogr.Feature.GetFieldAsDateTime # [ year, month, day, hour, minute, second, timezone flag ] dt = feature.GetFieldAsDateTime(1) if not gdalVersionMin(maj=3, min=8): tz = None # assume local time elif dt[6] == ogr.TZFLAG_UNKNOWN: logging.warning('Datetime specified with unknown timezone in layer cache\'s ' 'field #%d "%s", assuming local time', 1, feature.GetDefnRef().GetFieldDefn(1).GetName()) tz = None elif dt[6] == ogr.TZFLAG_LOCALTIME: tz = None elif dt[6] == ogr.TZFLAG_UTC: tz = UTC else: tz = timezone(offset=timedelta(seconds=(dt[6] - ogr.TZFLAG_UTC) * 900)) ms, s = modf(dt[5]) dt = datetime( year=dt[0], # including century month=dt[1], # 01 ≤ year ≤ 12 day=dt[2], # 01 ≤ day ≤ 31 hour=dt[3], # 00 ≤ hour ≤ 23 minute=dt[4], # 00 ≤ minute ≤ 59 second=int(s), # 00 ≤ second ≤ 59 microsecond=round(ms*1000000), tzinfo=tz ) fpr = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None logging.debug('Found entry in layer cache for "%s", last_updated=%s, fingerprint=%s', layername, dt.isoformat(timespec='microseconds'), fpr.hex() if fpr is not None else 'NULL') ret = int(dt.timestamp() * 1000000.) * 1000 < t if lyrcache.GetNextFeature() is not None: raise RuntimeError(f'Duplicate key {layername}') if not ret: for source_path, mtime_ns in sorted(mtimes_ns.items()): # XXX datetime.fromtimestamp() doesn't support nanosecond input # https://github.com/python/cpython/issues/59648 mtime = (mtime_ns // 1000) / 1000000. dt = datetime.fromtimestamp(mtime) logging.info('Source file %s is unchanged (last modification time %s)', source_path, dt.astimezone().isoformat(timespec='seconds')) return ret def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]: """Place shared locks on each source path and return their respective file descriptors. We could do that one layerdef at a time (one output layer at a time) to reduce the time during which the sources prevented from being updated/downloaded, but their is some value in having consistency across the whole import process.""" umask = os.umask(0o002) lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY) try: ret = {} for layerdef in layerdefs: for source in layerdef['sources']: source_path = source['source']['path'] if source_path in ret: continue lockfile = getSourcePathLockFileName(source_path) lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664, dir_fd=lockdir_fd) logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)', source_path, lockfile) flock(lock_fd, LOCK_SH) ret[source_path] = lock_fd return ret finally: try: os.close(lockdir_fd) except (OSError, ValueError): logging.exception('Could not close lockdir') os.umask(umask) def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None: """Release shared locks on the source paths. Closed FDs are removed from the dictionary in place.""" toremove = set() for path, lock_fd in lock_fds.items(): try: os.close(lock_fd) except (OSError, ValueError): logging.exception('Could not release lock for %s', path) else: logging.debug('Released lock for %s', path) toremove.add(path) for path in toremove: lock_fds.pop(path) # pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements def main() -> NoReturn: common.init_logger(app=os.path.basename(__file__), level=logging.INFO) parser = argparse.ArgumentParser(description='Extract and import GIS layers.') parser.add_argument('--cachedir', default=None, help=f'cache directory (default: {os.curdir})') parser.add_argument('--debug', action='count', default=0, help=argparse.SUPPRESS) parser.add_argument('--lockfile', default=None, help='obtain an exclusive lock before processing') parser.add_argument('--lockdir-sources', default=None, help='optional directory for lock files to source paths') parser.add_argument('--mvtdir', default=None, help='optional directory for Mapbox Vector Tiles (MVT)') parser.add_argument('--mvt-compress', default=False, action='store_true', help='whether to compress Mapbox Vector Tiles (MVT) files') parser.add_argument('--force', default=False, action='store_true', help='import even if no new changes were detected') parser.add_argument('groupname', nargs='*', help='group layer name(s) to process') args = parser.parse_args() if args.debug > 0: # pylint: disable=duplicate-code logging.getLogger().setLevel(logging.DEBUG) if args.debug > 1: gdal.ConfigurePythonLogging(enable_debug=True) config = common.parse_config(groupnames=None if args.groupname == [] else args.groupname) # validate configuration if 'dataset' not in config: raise BadConfiguration('Configuration does not specify output dataset') layers = config.get('layers', {}) validate_sources(layers) # set global GDAL/OGR configuration options for pszKey, pszValue in config.get('GDALconfig', {}).items(): logging.debug('gdal.SetConfigOption(%s, %s)', pszKey, pszValue) gdal.SetConfigOption(pszKey, pszValue) # open output dataset (possibly create it first) dso = openOutputDS(config['dataset']) validate_schema(layers, drvo=dso.GetDriver(), lco_defaults=config['dataset'].get('create-layer-options', None)) # get configured Spatial Reference System and extent srs = getSRS(config.get('SRS', None)) extent = getExtent(config.get('extent', None), srs=srs) if args.lockfile is not None: # obtain an exclusive lock and don't release it until exiting the program lock_fd = os.open(args.lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644) logging.debug('flock("%s", LOCK_EX)', args.lockfile) flock(lock_fd, LOCK_EX) # create all output layers before starting the transaction for layername, layerdef in layers.items(): lyr = dso.GetLayerByName(layername) if lyr is not None: # TODO dso.DeleteLayer(layername) if --overwrite and # dso.TestCapability(ogr.ODsCDeleteLayer) # (Sets OVERWRITE=YES for PostgreSQL and GPKG.) continue if not dso.TestCapability(ogr.ODsCCreateLayer): raise RuntimeError(f'Output driver {dso.GetDriver().ShortName} does not ' 'support layer creation') createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None)) if args.mvtdir is not None: args.mvtdir = Path(args.mvtdir) if args.mvtdir == Path(): # make sure it's not curdir as we don't want to exchange it raise RuntimeError('Invalid value for --mvtdir') args.mvtdir.parent.mkdir(parents=True, exist_ok=True) if args.cachedir is not None: args.cachedir = Path(args.cachedir) if args.lockdir_sources is None: sourcePathLocks = None else: sourcePathLocks = lockSourcePaths(layerdefs=layers.values(), lockdir=args.lockdir_sources) if (dso.TestCapability(ogr.ODsCTransactions) and # we need SAVEPOINT support dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')): logging.debug('Starting transaction') dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE else: logging.warning('Output driver %s does not support dataset transactions or SQL SAVEPOINTs', dso.GetDriver().ShortName) dsoTransaction = False # validate layer cache lyr_cache = config['dataset'].get('layercache', None) if lyr_cache is None: pass elif validateCacheLayer(dso, lyr_cache): lyr_cache = dso.GetLayerByName(lyr_cache) else: if not args.force: logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache) args.force = True lyr_cache = None rv = 0 try: r = {} n = 0 start = time_monotonic() for layername, layerdef in layers.items(): r[layername] = r0 = processOutputLayer(dso, layername, layerdef, srs=srs, cachedir=args.cachedir, extent=extent, dsTransaction=dsoTransaction, lyrcache=lyr_cache, force=args.force) n += 1 logging.info('Import result status for layer "%s": %s', layername, str(r0)) if r0 == ImportStatus.IMPORT_ERROR: rv = 1 if dsoTransaction: dsoTransaction = False logging.debug('Rolling back transaction') # no need to catch the exception here if dso.CommitTransaction() != ogr.OGRERR_NONE: logging.error('Could not rollback transaction') break elapsed = time_monotonic() - start logging.info('Processed %d destination layers in %s', n, common.format_time(elapsed)) if sourcePathLocks is not None: releaseSourcePathLocks(sourcePathLocks) export_layers = { l:d for l,d in layers.items() if d.get('publish', None) is not None } if args.mvtdir is None or any(r0 == ImportStatus.IMPORT_ERROR for r0 in r.values()): pass elif len(export_layers) == 0: logging.warning('--mvtdir option used but no layer has a publication definition') elif (all(r0 == ImportStatus.IMPORT_NOCHANGE for l,r0 in r.items() if l in export_layers) and args.mvtdir.is_dir()): logging.info('Skipping MVT export for group %s (no changes)', ', '.join(args.groupname) if args.groupname is not None else '*') else: exportMVT(dso, layers=export_layers, dst=args.mvtdir, default_options=config.get('vector-tiles', None), compress=args.mvt_compress) if dsoTransaction: dsoTransaction = False logging.debug('Committing transaction') if dso.CommitTransaction() != ogr.OGRERR_NONE: logging.error('Could not commit transaction') rv = 1 except Exception: # pylint: disable=broad-exception-caught if dsoTransaction: logging.exception('Exception occured within transaction, rolling back') try: if dso.RollbackTransaction() != ogr.OGRERR_NONE: logging.error('Could not rollback transaction') except Exception: # pylint: disable=broad-exception-caught logging.exception('Could not rollback transaction') else: traceback.print_exc() sys.exit(1) finally: lyr_cache = None dso = None extent = None srs = None sys.exit(rv) gdal.UseExceptions() main()