From 31feb9d0a559d5eb268db04ca91855b959568811 Mon Sep 17 00:00:00 2001 From: Guilhem Moulin Date: Thu, 24 Apr 2025 11:48:46 +0200 Subject: Move layer transactional logic to importSources(). It's much clearer that way. The destination layer is cleared and updated in that function, so it makes sense if that's also where transactions (or SAVEPOINTs) are committed or rollback'ed. --- webmap-import | 208 ++++++++++++++++++++-------------------------------------- 1 file changed, 72 insertions(+), 136 deletions(-) (limited to 'webmap-import') diff --git a/webmap-import b/webmap-import index 0bf9c9e..1d3f4ec 100755 --- a/webmap-import +++ b/webmap-import @@ -34,7 +34,7 @@ from pathlib import Path from typing import Any, Optional, NoReturn import traceback -from osgeo import gdal, ogr +from osgeo import gdal, ogr, osr from osgeo.gdalconst import ( CE_None as GDAL_CE_None, DCAP_DEFAULT_FIELDS as GDAL_DCAP_DEFAULT_FIELDS, @@ -64,9 +64,7 @@ from import_source import ( openOutputDS, createOutputLayer, validateOutputLayer, - clearLayer, - importSources, - updateLayerCache + importSources ) def setFieldIf(cond : bool, @@ -250,6 +248,49 @@ def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]): rule['replace'] = re.compile(rule['replace']) rules[idx] = ( rule['replace'], rule['with'] ) +def processOutputLayer(ds : gdal.Dataset, + layername : str, + layerdef : dict[str,Any], + srs : Optional[osr.SpatialReference] = None, + cachedir : Path|None = None, + extent : ogr.Geometry|None = None, + dsTransaction : bool = False, + lyrcache : ogr.Layer|None = None, + force : bool = False) -> bool: + """Process an output layer, and return a boolean indicating whether + the import was sucessful (or skipped), or not.""" + + logging.info('Processing output layer "%s"', layername) + lyr = ds.GetLayerByName(layername) + if lyr is None: + raise RuntimeError(f'Failed to create output layer "{layername}"??') + if not lyr.TestCapability(ogr.OLCSequentialWrite): + raise RuntimeError(f'Output layer "{layername}" has no working ' + 'CreateFeature() method') + + sources = layerdef['sources'] + if not (lyrcache is None or force or + areSourceFilesNewer(layername, sources=sources, + lyrcache=lyrcache, + cachedir=cachedir)): + logging.info('Output layer "%s" is up to date, skipping', layername) + return True + + validateOutputLayer(lyr, srs=srs, options=layerdef['create']) + + description = layerdef.get('description', None) + if (description is not None and + lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): + logging.warning('Could not set description metadata') + + # setup output field mapping in the sources dictionary + setOutputFieldMap(lyr.GetLayerDefn(), sources) + + return importSources(dso=ds, lyr=lyr, sources=sources, + cachedir=cachedir, extent=extent, + dsoTransaction=dsTransaction, + lyrcache=lyrcache) + def validate_sources(layers : dict[str, Any]) -> None: """Mangle and validate layer sources and import definitions""" toremove = set() @@ -381,7 +422,7 @@ def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool: def areSourceFilesNewer(layername : str, sources : dict[str,Any], - cache : ogr.Layer, + lyrcache : ogr.Layer, cachedir : Optional[Path] = None) -> bool: """Return a boolean indicating whether the layer cache is up to date with respect to the source files found on disk. That is, the last modification @@ -416,10 +457,10 @@ def areSourceFilesNewer(layername : str, assert t is not None attributeFilter = 'layername = ' + escape_literal_str(layername) - logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter) - cache.SetAttributeFilter(attributeFilter) + logging.debug('SetAttributeFilter("%s", "%s")', lyrcache.GetName(), attributeFilter) + lyrcache.SetAttributeFilter(attributeFilter) - feature = cache.GetNextFeature() + feature = lyrcache.GetNextFeature() if feature is None: # not in cache return True @@ -458,7 +499,7 @@ def areSourceFilesNewer(layername : str, dt.isoformat(timespec='microseconds')) ret = int(dt.timestamp() * 1000000.) * 1000 < t - if cache.GetNextFeature() is not None: + if lyrcache.GetNextFeature() is not None: raise RuntimeError(f'Duplicate key {layername}') if not ret: @@ -512,34 +553,6 @@ def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None: for path in toremove: lock_fds.pop(path) -def commitOrRollbackTransactionIf(obj : gdal.Dataset|ogr.Layer, commit : bool) -> bool: - """Either commit or rollback the transaction, depending on a condition - indicating whether there are pending new changes to be committed.""" - if commit: - logging.debug('Committing transaction') - if obj.CommitTransaction() == ogr.OGRERR_NONE: - return True - logging.error('Could not commit transaction') - else: - logging.info('No changes detected, rolling back transaction') - # don't call rollbackTransactionTry() as we don't want to catch exceptions - if obj.RollbackTransaction() == ogr.OGRERR_NONE: - return True - logging.error('Could not rollback transaction') - return False - -def rollbackTransactionTry(obj : gdal.Dataset|ogr.Layer) -> bool: - """Try to rollback the current transaction on the dataset or layer, ignoring - any exception but logging errors and returning a boolean indicating - success.""" - try: - if obj.RollbackTransaction() == ogr.OGRERR_NONE: - return True - logging.error('Could not rollback transaction') - except Exception: # pylint: disable=broad-exception-caught - logging.exception('Could not rollback transaction') - return False - # pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements def main() -> NoReturn: common.init_logger(app=os.path.basename(__file__), level=logging.INFO) @@ -626,129 +639,52 @@ def main() -> NoReturn: # validate layer cache lyr_cache = config['dataset'].get('layercache', None) - if lyr_cache is not None and not validateCacheLayer(dso, lyr_cache): + if lyr_cache is None: + pass + elif validateCacheLayer(dso, lyr_cache): + lyr_cache = dso.GetLayerByName(lyr_cache) + else: if not args.force: logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache) args.force = True lyr_cache = None - bChanged = False rv = 0 try: for layername, layerdef in layers.items(): - logging.info('Processing output layer "%s"', layername) - lyr = dso.GetLayerByName(layername) - if lyr is None: - raise RuntimeError(f'Failed to create output layer "{layername}"??') - if not lyr.TestCapability(ogr.OLCSequentialWrite): - raise RuntimeError(f'Output layer "{layername}" has no working ' - 'CreateFeature() method') - - sources = layerdef['sources'] - if not (lyr_cache is None or args.force or - areSourceFilesNewer(layername, sources=sources, - cache=dso.GetLayerByName(lyr_cache), - cachedir=cachedir)): - logging.info('Output layer "%s" is up to date, skipping', layername) - continue - - validateOutputLayer(lyr, srs=srs, options=layerdef['create']) - - # setup output field mapping in the sources dictionary - setOutputFieldMap(lyr.GetLayerDefn(), sources) - - if dsoTransaction: - lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername) - logging.debug(lyrTransaction) - dso.ExecuteSQL(lyrTransaction) - elif lyr.TestCapability(ogr.OLCTransactions): - # start transaction if possible - logging.debug('Starting transaction') - lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE - else: - logging.warning('Unsafe update, output layer "%s" does not support transactions', - layername) - lyrTransaction = False - - try: - description = layerdef.get('description', None) - if (description is not None and - lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): - logging.warning('Could not set description metadata') - - now = datetime.now().astimezone() - clearLayer(dso, lyr) # TODO conditional (only if not new)? - importSources(lyr_dst=lyr, sources=sources, - cachedir=cachedir, extent=extent) - - # force the PG driver to call EndCopy() to detect errors and trigger a - # rollback if needed - dso.FlushCache() - - if lyr_cache is None: - bChanged0 = True - else: - updateLayerCache(layername=layername, - cache=dso.GetLayerByName(lyr_cache), - last_updated=now) - bChanged0 = True # TODO fingerprint lyr to detect changes - dso.FlushCache() - bChanged = bChanged or bChanged0 - - if isinstance(lyrTransaction, str): - if not (bChanged0 or args.force): - logging.info('No changes detected, rolling back to previous savepoint') - query = 'ROLLBACK TO ' + lyrTransaction - logging.debug(query) - try: - dso.ExecuteSQL(query) - except Exception: # pylint: disable=broad-exception-caught - logging.exception('Could not %s', query) - rv = 1 - elif isinstance(lyrTransaction, bool) and lyrTransaction: - lyrTransaction = False - if not commitOrRollbackTransactionIf(lyr, commit=bChanged0 or args.force): - rv = 1 - - except Exception: # pylint: disable=broad-exception-caught - if isinstance(lyrTransaction, str): - query = 'ROLLBACK TO ' + lyrTransaction - logging.exception('Exception occured within transaction, ' - 'rolling back to previous savepoint') - logging.debug(query) - dso.ExecuteSQL(query) - elif isinstance(lyrTransaction, bool) and lyrTransaction: - logging.exception('Exception occured within transaction, rolling back') - rollbackTransactionTry(lyr) - else: - traceback.print_exc() + if not processOutputLayer(dso, layername, layerdef, + srs=srs, + cachedir=cachedir, + extent=extent, + dsTransaction=dsoTransaction, + lyrcache=lyr_cache, + force=args.force): rv = 1 - finally: - lyr = None # close output layer - if isinstance(lyrTransaction, str): - query = 'RELEASE ' + lyrTransaction - logging.debug(query) - dso.ExecuteSQL(query) - if sourcePathLocks is not None: releaseSourcePathLocks(sourcePathLocks) if dsoTransaction: - # commit transaction dsoTransaction = False - if not commitOrRollbackTransactionIf(dso, commit=bChanged or args.force): + logging.debug('Committing transaction') + if dso.CommitTransaction() != ogr.OGRERR_NONE: + logging.error('Could not commit transaction') rv = 1 except Exception: # pylint: disable=broad-exception-caught + rv = 1 if dsoTransaction: logging.exception('Exception occured within transaction, rolling back') - rollbackTransactionTry(lyr) + try: + if dso.RollbackTransaction() != ogr.OGRERR_NONE: + logging.error('Could not rollback transaction') + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not rollback transaction') else: traceback.print_exc() - sys.exit(1) finally: + lyr_cache = None dso = None srs = None extent = None -- cgit v1.2.3