#!/usr/bin/python3 #---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (extract/import layers) # Copyright © 2024-2025 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- # pylint: disable=invalid-name, missing-module-docstring, fixme from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY import os from stat import S_ISREG import sys from fcntl import flock, LOCK_EX, LOCK_SH import logging import argparse import re from datetime import datetime from pathlib import Path from typing import Any, Optional, NoReturn import traceback from osgeo import gdal, ogr from osgeo.gdalconst import ( CE_None as GDAL_CE_None, DCAP_DEFAULT_FIELDS as GDAL_DCAP_DEFAULT_FIELDS, DCAP_NOTNULL_FIELDS as GDAL_DCAP_NOTNULL_FIELDS, DCAP_UNIQUE_FIELDS as GDAL_DCAP_UNIQUE_FIELDS, ) from osgeo import gdalconst import common from common import ( BadConfiguration, escape_identifier, escape_literal_str, getSourcePathLockFileName ) from common_gdal import ( gdalVersionMin, gdalGetMetadataItem, getSRS, getExtent, parseGeomType, parseFieldType, parseSubFieldType, parseTimeZone ) from import_source import ( openOutputDS, createOutputLayer, validateOutputLayer, clearLayer, importSources, getSourceCacheKey ) def setFieldIf(cond : bool, attrName : str, val : Any, data : dict[str, Any], fldName : str, drvName : str, log = logging.warning) -> None: """Conditionally set a field""" if cond: data[attrName] = val else: if isinstance(val, str): val2 = '"' + val + '"' else: val2 = str(val) log('Ignoring %s=%s on field "%s" (not supported by %s driver)', attrName, val2, fldName, drvName) # pylint: disable-next=too-many-branches def validate_schema(layers : dict[str, Any], drvo : Optional[gdal.Driver] = None, lco_defaults : Optional[dict[str, str]] = None) -> None: """Validate layer creation options and schema. The schema is modified in place with the parsed result. (We need the driver of the output dataset to determine capability on constraints.)""" # Cf. https://github.com/OSGeo/gdal/blob/master/NEWS.md if gdalVersionMin(maj=3, min=7): # list of capability flags supported by the CreateField() API drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_CREATION_FIELD_DEFN_FLAGS) drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else [] drvoSupportsFieldComment = 'Comment' in drvoFieldDefnFlags # GetTZFlag()/SetTZFlag() and OGR_TZFLAG_* constants added in 3.8.0 hasTZFlagSupport = gdalVersionMin(maj=3, min=8) else: # list of flags supported by the OGRLayer::AlterFieldDefn() API drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_ALTER_FIELD_DEFN_FLAGS) drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else [] # GetComment()/SetComment() added in 3.7.0 drvoSupportsFieldComment = False hasTZFlagSupport = False # cache driver capabilities drvoSupportsFieldWidthPrecision = 'WidthPrecision' in drvoFieldDefnFlags drvoSupportsFieldNullable = ('Nullable' in drvoFieldDefnFlags and gdalGetMetadataItem(drvo, GDAL_DCAP_NOTNULL_FIELDS)) drvoSupportsFieldUnique = ('Unique' in drvoFieldDefnFlags and gdalGetMetadataItem(drvo, GDAL_DCAP_UNIQUE_FIELDS)) drvoSupportsFieldDefault = ('Default' in drvoFieldDefnFlags and gdalGetMetadataItem(drvo, GDAL_DCAP_DEFAULT_FIELDS)) drvoSupportsFieldAlternativeName = 'AlternativeName' in drvoFieldDefnFlags for layername, layerdef in layers.items(): create = layerdef.get('create', None) if create is None or len(create) < 1: logging.warning('Layer "%s" has no creation schema', layername) continue # prepend global layer creation options (dataset:create-layer-options) # and build the option=value list lco = create.get('options', None) if lco_defaults is not None or lco is not None: options = [] if lco_defaults is not None: options += [ k + '=' + str(v) for k, v in lco_defaults.items() ] if lco is not None: options += [ k + '=' + str(v) for k, v in lco.items() ] create['options'] = options # parse geometry type create['geometry-type'] = parseGeomType(create.get('geometry-type', None)) fields = create.get('fields', None) if fields is None: create['fields'] = [] else: fields_set = set() for idx, fld_def in enumerate(fields): fld_name = fld_def.get('name', None) if fld_name is None or fld_name == '': raise BadConfiguration(f'Field #{idx} has no name') if fld_name in fields_set: raise BadConfiguration(f'Duplicate field "{fld_name}"') fields_set.add(fld_name) fld_def2 = { 'Name': fld_name } for k, v in fld_def.items(): k2 = k.lower() if k2 == 'name': pass elif k2 in ('alternativename', 'alias'): setFieldIf(drvoSupportsFieldAlternativeName, 'AlternativeName', v, fld_def2, fld_name, drvo.ShortName, log=logging.debug) elif k2 == 'comment': setFieldIf(drvoSupportsFieldComment, 'Comment', v, fld_def2, fld_name, drvo.ShortName, log=logging.debug) elif k2 == 'type': fld_def2['Type'] = parseFieldType(v) elif k2 == 'subtype': fld_def2['SubType'] = parseSubFieldType(v) elif k2 == 'tz': if hasTZFlagSupport: fld_def2['TZFlag'] = parseTimeZone(v) else: logging.debug('Ignoring TZ="%s" on field "%s" (OGR v%s is too old)', v, fld_name, gdal.__version__) elif k2 == 'width' and v is not None and isinstance(v, int): setFieldIf(drvoSupportsFieldWidthPrecision, 'Width', v, fld_def2, fld_name, drvo.ShortName) elif k2 == 'precision' and v is not None and isinstance(v, int): setFieldIf(drvoSupportsFieldWidthPrecision, 'Precision', v, fld_def2, fld_name, drvo.ShortName) # constraints elif k2 == 'default': setFieldIf(drvoSupportsFieldDefault, 'Default', v, fld_def2, fld_name, drvo.ShortName) elif k2 == 'nullable' and v is not None and isinstance(v, bool): setFieldIf(drvoSupportsFieldNullable, 'Nullable', v, fld_def2, fld_name, drvo.ShortName) elif k2 == 'unique' and v is not None and isinstance(v, bool): setFieldIf(drvoSupportsFieldUnique, 'Unique', v, fld_def2, fld_name, drvo.ShortName) else: raise BadConfiguration(f'Field "{fld_name}" has unknown key "{k}"') fields[idx] = fld_def2 def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]): """Setup output field mapping, modifying the sources dictionary in place.""" fieldMap = {} n = defn.GetFieldCount() for i in range(n): fld = defn.GetFieldDefn(i) fldName = fld.GetName() fieldMap[fldName] = i for source in sources: source_import = source['import'] fieldMap2 = source_import.get('field-map', None) if fieldMap2 is None: fieldMap2 = fieldMap else: if isinstance(fieldMap2, list): # convert list to identity dictionary fieldMap2 = { fld: fld for fld in fieldMap2 } for ifld, ofld in fieldMap2.items(): i = fieldMap.get(ofld, None) if i is None: raise RuntimeError(f'Ouput layer has no field named "{ofld}"') fieldMap2[ifld] = i source_import['field-map'] = fieldMap2 # validate field value mapping valueMap = source_import.get('value-map', None) if valueMap is not None: for fldName, rules in valueMap.items(): if rules is None: continue if not isinstance(rules, list): rules = [rules] for idx, rule in enumerate(rules): if rule is None or not isinstance(rule, dict): raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}') if 'type' not in rule: ruleType = rule['type'] = 'literal' else: ruleType = rule['type'] if ('replace' not in rule or 'with' not in rule or len(rule) != 3 or ruleType is None or ruleType not in ('literal', 'regex')): raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}') if ruleType == 'regex': rule['replace'] = re.compile(rule['replace']) rules[idx] = ( rule['replace'], rule['with'] ) def validate_sources(layers : dict[str, Any]) -> None: """Mangle and validate layer sources and import definitions""" toremove = set() for layername, layerdefs in layers.items(): sources = layerdefs.get('sources', None) if sources is None or len(sources) < 1: logging.warning('Output layer "%s" has no definition, skipping', layername) toremove.add(layername) continue for idx, layerdef in enumerate(sources): importdef = layerdef.get('import', None) if importdef is None: raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' 'has no import definition') sourcedef = layerdef.get('source', None) unar = None if sourcedef is None else sourcedef.get('unar', None) src = None if sourcedef is None else sourcedef.get('path', None) ds_srcpath = importdef.get('path', None) if src is None and unar is None and ds_srcpath is not None: # fallback to importe:path if there is no unarchiving recipe src = ds_srcpath if unar is not None and ds_srcpath is None: raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' 'has no import source path') if src is None: raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' 'has no source path') layerdef['source'] = { 'path': src, 'unar': unar } for layername in toremove: layers.pop(layername) def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int, name : str, typ : int, subtyp : int = ogr.OFSTNone, width : int = 0, isNullable : Optional[bool] = None) -> bool: """Validate field #idx from the source cache layer/table.""" n = defn.GetFieldCount() if idx >= n: return False defn = defn.GetFieldDefn(idx) b = True name2 = defn.GetName() if name2 != name: logging.warning('Source cache layer\'s field #%d has name "%s" != "%s", ignoring cache', idx, name2, name) b = False if isNullable is not None and defn.IsNullable() != isNullable: # non-fatal logging.warning('Source cache layer\'s field #%d ("%s") %s nullable', idx, name2, 'is' if defn.IsNullable() else 'is not') typ2 = defn.GetType() if typ2 != typ: logging.warning('Source cache layer\'s field #%d ("%s") has type %s != %s, ignoring cache', idx, name2, ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ)) b = False subtyp2 = defn.GetSubType() if subtyp2 != subtyp: logging.warning('Source cache layer\'s field #%d ("%s") has subtype %s != %s, ' 'ignoring cache', idx, name2, ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp)) b = False width2 = defn.GetWidth() if width2 != 0 and (width == 0 or width2 < width): # non-fatal logging.warning('Source cache layer\'s field #%d ("%s") is too small (width %d < %d)', idx, name2, width2, width) return b def getSourceCacheLayer(ds : gdal.Dataset, name : str|None, force : bool = False) -> ogr.Layer|None: """Get the source cache layer/table and validate it.""" if name is None: return None lyr = ds.GetLayerByName(name) if lyr is None: if not force: # show a warning if args.force is not set logging.warning('Table "%s" does not exist, implying --force', name) return None # if not (lyr.TestCapability(ogr.OLCRandomWrite) and # gdalVersionMin(maj=3, min=9) and # lyr.TestCapability(ogr.OLCUpdateFeature)): # # TODO OLCUpdateFeature was added in 3.7 but we also want to use .GetDataset() # # which was added in 3.9 # logging.warning('Layer "%s" does not support OLCUpdateFeature capability, ' # 'ignoring cache', name) # return None defn = lyr.GetLayerDefn() fields = [ { 'name': 'source_path', 'typ': ogr.OFTString, 'isNullable': False, 'width': 2047 }, { 'name': 'archive_member', 'typ': ogr.OFTString, 'isNullable': False, 'width': 2047 }, { 'name': 'layername', 'typ': ogr.OFTString, 'isNullable': False, 'width': 255 }, { 'name': 'mtime_ns', 'typ': ogr.OFTInteger64, 'isNullable': True }, ] m = len(fields) n = defn.GetFieldCount() if n < m: logging.warning('Source cache layer/table "%s" has %d < %d fields, ignoring cache', name, n, m) elif n != m: logging.warning('Source cache layer/table "%s" has %d != %d fields', name, n, m) if not all(validateSourceCacheField(defn, i, **fld) for i,fld in enumerate(fields)): return None n = defn.GetGeomFieldCount() if n > 0: geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName()) for i in range(n) ] logging.warning('Source cache layer/table "%s" has %d > 0 geometry field(s): %s', name, n, ', '.join(geomFieldNames)) if gdalVersionMin(maj=3, min=5): style = lyr.GetStyleTable() if style is not None: logging.warning('Source cache layer/table "%s" has a style table "%s"', name, style.GetLastStyleName()) return lyr def getSourcesMtimeNS(sources : dict[str,Any], cachedir : Optional[Path] = None) -> dict[str,int|None]: """Return a dictionary mapping each source path to its last modification time (in nanoseconds), or None if said source path is not a regular file.""" mtimes_ns = {} for source in sources: # the same source_path can be used multiple times, stat(2) only once source_path = source['source']['path'] mtimes_ns[source_path] = None for source_path in mtimes_ns: path = source_path if cachedir is None else str(cachedir.joinpath(source_path)) try: st = os.stat(path) if not S_ISREG(st.st_mode): raise FileNotFoundError mtimes_ns[source_path] = st.st_mtime_ns except (OSError, ValueError): #logging.warning('Could not stat(%s)', path) pass return mtimes_ns def isSourceCacheDirtyTime(lyr : ogr.Layer, sources : dict[str,Any], mtimes_ns : dict[str,int|None]) -> bool: """Return a boolean indicating whether the source cache layer/table is up to date with respect to the dictionary mapping each source path to its last modification time. That is, every triplet (source_path, archive_member, layername) needs to be present in cache, and the corresponding mtime_ns needs to match the stat(2) output of the source file on disk.""" ks = set() for source in sources: source_path, archive_member, layername = k = getSourceCacheKey(source) if k in ks: raise BadConfiguration(f'Duplicate key {k}') ks.add(k) if len(ks) == 0: return False attributeFilter = [] for source_path, archive_member, layername in ks: attributeFilter.append('(' + escape_literal_str(source_path) + ',' + escape_literal_str(archive_member) + ',' + escape_literal_str(layername) + ')') if len(attributeFilter) == 1: attributeFilter = '= ' + attributeFilter[0] else: attributeFilter = 'IN (' + ','.join(attributeFilter) + ')' attributeFilter = '(source_path,archive_member,layername) ' + attributeFilter logging.debug('SetAttributeFilter("%s", "%s")', lyr.GetName(), attributeFilter) lyr.SetAttributeFilter(attributeFilter) cache = {} feature = lyr.GetNextFeature() while feature is not None: k = ( feature.GetFieldAsString(0) if feature.IsFieldSetAndNotNull(0) else None, feature.GetFieldAsString(1) if feature.IsFieldSetAndNotNull(1) else None, feature.GetFieldAsString(2) if feature.IsFieldSetAndNotNull(2) else None, ) if any(k0 is None for k0 in k): # no column in the the key may be NULL raise RuntimeError(f'Bad key {k}') if k in cache: raise RuntimeError(f'Duplicate key {k}') cache[k] = feature.GetFieldAsInteger64(3) if feature.IsFieldSetAndNotNull(3) else None feature = lyr.GetNextFeature() for k in ks: a = cache.get(k, None) b = mtimes_ns[k[0]] if a is None or b is None or a != b: return True for source_path in sorted({ p for p,_,_ in ks }): # XXX datetime.fromtimestamp() doesn't support nanosecond input # https://github.com/python/cpython/issues/59648 mtime = (mtimes_ns[source_path] // 1000)/1000000. dt = datetime.fromtimestamp(mtime) logging.info('Source file %s is unchanged (last modification time %s)', source_path, dt.astimezone().isoformat(timespec='seconds')) return False def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]: """Place shared locks on each source path and return their respective file descriptors. We could do that one layerdef at a time (one output layer at a time) to reduce the time during which the sources prevented from being updated/downloaded, but their is some value in having consistency across the whole import process.""" lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY) try: ret = {} for layerdef in layerdefs: for source in layerdef['sources']: source_path = source['source']['path'] lockfile = getSourcePathLockFileName(source_path) lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664, dir_fd=lockdir_fd) logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)', source_path, lockfile) flock(lock_fd, LOCK_SH) ret[source_path] = lock_fd return ret finally: try: os.close(lockdir_fd) except (OSError, ValueError): logging.exception('Could not close lockdir') def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None: """Release shared locks on the source paths. Closed FDs are removed from the dictionary in place.""" toremove = set() for path, lock_fd in lock_fds.items(): try: os.close(lock_fd) except (OSError, ValueError): logging.exception('Could not release lock for %s', path) else: logging.debug('Released lock for %s', path) toremove.add(path) for path in toremove: lock_fds.pop(path) def commitOrRollbackTransactionIf(obj : gdal.Dataset|ogr.Layer, commit : bool) -> bool: """Either commit or rollback the transaction, depending on a condition indicating whether there are pending new changes to be committed.""" if commit: logging.debug('Committing transaction') if obj.CommitTransaction() == ogr.OGRERR_NONE: return True logging.error('Could not commit transaction') else: logging.info('No changes detected, rolling back transaction') # don't call rollbackTransactionTry() as we don't want to catch exceptions if obj.RollbackTransaction() == ogr.OGRERR_NONE: return True logging.error('Could not rollback transaction') return False def rollbackTransactionTry(obj : gdal.Dataset|ogr.Layer) -> bool: """Try to rollback the current transaction on the dataset or layer, ignoring any exception but logging errors and returning a boolean indicating success.""" try: if obj.RollbackTransaction() == ogr.OGRERR_NONE: return True logging.error('Could not rollback transaction') except Exception: # pylint: disable=broad-exception-caught logging.exception('Could not rollback transaction') return False # pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements def main() -> NoReturn: common.init_logger(app=os.path.basename(__file__), level=logging.INFO) parser = argparse.ArgumentParser(description='Extract and import GIS layers.') parser.add_argument('--cachedir', default=None, help=f'cache directory (default: {os.curdir})') parser.add_argument('--debug', action='count', default=0, help=argparse.SUPPRESS) parser.add_argument('--lockfile', default=None, help='obtain an exclusive lock before processing') parser.add_argument('--lockdir-sources', default=None, help='optional directory for lock files to sources paths') parser.add_argument('--force', default=False, action='store_true', help='import even if no new changes were detected') parser.add_argument('groupname', nargs='*', help='group layer name(s) to process') args = parser.parse_args() if args.debug > 0: # pylint: disable=duplicate-code logging.getLogger().setLevel(logging.DEBUG) if args.debug > 1: gdal.ConfigurePythonLogging(enable_debug=True) config = common.parse_config(groupnames=None if args.groupname == [] else args.groupname) # validate configuration if 'dataset' not in config: raise BadConfiguration('Configuration does not specify output dataset') layers = config.get('layers', {}) validate_sources(layers) # set global GDAL/OGR configuration options for pszKey, pszValue in config.get('GDALconfig', {}).items(): logging.debug('gdal.SetConfigOption(%s, %s)', pszKey, pszValue) gdal.SetConfigOption(pszKey, pszValue) # open output dataset (possibly create it first) dso = openOutputDS(config['dataset']) validate_schema(layers, drvo=dso.GetDriver(), lco_defaults=config['dataset'].get('create-layer-options', None)) # get configured Spatial Reference System and extent srs = getSRS(config.get('SRS', None)) extent = getExtent(config.get('extent', None), srs=srs)[0] if args.lockfile is not None: # obtain an exclusive lock and don't release it until exiting the program lock_fd = os.open(args.lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644) logging.debug('flock("%s", LOCK_EX)', args.lockfile) flock(lock_fd, LOCK_EX) # create all output layers before starting the transaction for layername, layerdef in layers.items(): lyr = dso.GetLayerByName(layername) if lyr is not None: # TODO dso.DeleteLayer(layername) if --overwrite and # dso.TestCapability(ogr.ODsCDeleteLayer) # (Sets OVERWRITE=YES for PostgreSQL and GPKG.) continue if not dso.TestCapability(ogr.ODsCCreateLayer): raise RuntimeError(f'Output driver {dso.GetDriver().ShortName} does not ' 'support layer creation') createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None)) cachedir = Path(args.cachedir) if args.cachedir is not None else None if args.lockdir_sources is None: sourcePathLocks = None else: sourcePathLocks = lockSourcePaths(layerdefs=layers.values(), lockdir=args.lockdir_sources) if (dso.TestCapability(ogr.ODsCTransactions) and dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')): logging.debug('Starting transaction') dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE else: logging.warning('Output driver %s does not support dataset transactions or SQL SAVEPOINTs', dso.GetDriver().ShortName) dsoTransaction = False # get source cache layer/table lyr_sourcecache = getSourceCacheLayer(dso, config['dataset'].get('sourcecache-layername', None), force=args.force) bChanged = False rv = 0 try: for layername, layerdef in layers.items(): logging.info('Processing output layer "%s"', layername) lyr = dso.GetLayerByName(layername) if lyr is None: raise RuntimeError(f'Failed to create output layer "{layername}"??') if not lyr.TestCapability(ogr.OLCSequentialWrite): raise RuntimeError(f'Output layer "{layername}" has no working ' 'CreateFeature() method') sources = layerdef['sources'] if lyr_sourcecache is None: sourcesMtimeNS = None else: sourcesMtimeNS = getSourcesMtimeNS(sources, cachedir=cachedir) if not (args.force or isSourceCacheDirtyTime(lyr_sourcecache, sources, mtimes_ns=sourcesMtimeNS)): logging.info('Output layer "%s" is up to date, skipping', layername) continue validateOutputLayer(lyr, srs=srs, options=layerdef['create']) # setup output field mapping in the sources dictionary setOutputFieldMap(lyr.GetLayerDefn(), sources) if dsoTransaction: lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername) logging.debug(lyrTransaction) dso.ExecuteSQL(lyrTransaction) elif lyr.TestCapability(ogr.OLCTransactions): # start transaction if possible logging.debug('Starting transaction') lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE else: logging.warning('Unsafe update, output layer "%s" does not support transactions', layername) lyrTransaction = False try: clearLayer(dso, lyr) # TODO conditional (only if not new)? description = layerdef.get('description', None) if (description is not None and lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): logging.warning('Could not set description metadata') bChanged0 = importSources(lyr_dst=lyr, dso=dso, sources=sources, mtimes=sourcesMtimeNS, lyr_sourcecache=lyr_sourcecache, cachedir=cachedir, extent=extent) bChanged = bChanged or bChanged0 if isinstance(lyrTransaction, str): if not (bChanged0 or args.force): logging.info('No changes detected, rolling back to previous savepoint') query = 'ROLLBACK TO ' + lyrTransaction logging.debug(query) try: dso.ExecuteSQL(query) except Exception: # pylint: disable=broad-exception-caught logging.exception('Could not %s', query) rv = 1 elif isinstance(lyrTransaction, bool) and lyrTransaction: lyrTransaction = False if not commitOrRollbackTransactionIf(lyr, commit=bChanged0 or args.force): rv = 1 except Exception: # pylint: disable=broad-exception-caught if isinstance(lyrTransaction, str): query = 'ROLLBACK TO ' + lyrTransaction logging.exception('Exception occured within transaction, ' 'rolling back to previous savepoint') logging.debug(query) dso.ExecuteSQL(query) elif isinstance(lyrTransaction, bool) and lyrTransaction: logging.exception('Exception occured within transaction, rolling back') rollbackTransactionTry(lyr) else: traceback.print_exc() rv = 1 finally: lyr = None # close output layer if isinstance(lyrTransaction, str): query = 'RELEASE ' + lyrTransaction logging.debug(query) dso.ExecuteSQL(query) if sourcePathLocks is not None: releaseSourcePathLocks(sourcePathLocks) if dsoTransaction: # commit transaction dsoTransaction = False if not commitOrRollbackTransactionIf(dso, commit=bChanged or args.force): rv = 1 except Exception: # pylint: disable=broad-exception-caught if dsoTransaction: logging.exception('Exception occured within transaction, rolling back') rollbackTransactionTry(lyr) else: traceback.print_exc() sys.exit(1) finally: lyr_sourcecache = None dso = None srs = None extent = None sys.exit(rv) gdal.UseExceptions() main()