diff options
-rw-r--r-- | import_source.py | 103 | ||||
-rwxr-xr-x | webmap-import | 208 |
2 files changed, 165 insertions, 146 deletions
diff --git a/import_source.py b/import_source.py index 8ce69a9..2d2f116 100644 --- a/import_source.py +++ b/import_source.py @@ -27,6 +27,7 @@ from fnmatch import fnmatchcase from pathlib import Path from datetime import datetime, timedelta, UTC from typing import Any, Optional +import traceback from osgeo import gdal, ogr, osr from osgeo.gdalconst import ( @@ -433,16 +434,93 @@ def listArchiveMembers(namelist : list[str], return members # pylint: disable-next=dangerous-default-value -def importSources(lyr_dst : ogr.Layer, +def importSources(dso : gdal.Dataset, lyr : ogr.Layer, sources : dict[str,Any] = {}, cachedir : Path|None = None, - extent : ogr.Geometry|None = None) -> None: - """Import source layers to lyr_dst.""" - for source in sources: - _importSource(lyr_dst, **source['source'], - args=source['import'], - cachedir=cachedir, - extent=extent) + extent : ogr.Geometry|None = None, + dsoTransaction : bool = False, + lyrcache : ogr.Layer|None = None) -> bool: + """Clear lyr and import source layers to it.""" + + layername = lyr.GetName() + if dsoTransaction: + # declare a SAVEPOINT (nested transaction) within the DS-level transaction + lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername) + logging.debug('%s', lyrTransaction) + dso.ExecuteSQL(lyrTransaction) + elif lyr.TestCapability(ogr.OLCTransactions): + # try to start transaction on the layer + logging.debug('Starting transaction on output layer "%s"', layername) + lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE + if not lyrTransaction: + logging.warning('Couldn\'t start transaction on output layer "%s"', layername) + else: + logging.warning('Unsafe update, output layer "%s" doesn\'t support transactions', + layername) + lyrTransaction = False + + rv = True + now = datetime.now().astimezone() + try: + clearLayer(dso, lyr) # TODO conditional (only if not new)? + + for source in sources: + _importSource(lyr, **source['source'], + args=source['import'], + cachedir=cachedir, + extent=extent) + # force the PG driver to call EndCopy() to detect errors and trigger a + # rollback if needed + dso.FlushCache() + + if lyrcache is not None: + updateLayerCache(layername=layername, + cache=lyrcache, + ds=dso, + savepoint=lyrTransaction if isinstance(lyrTransaction,str) else None, + last_updated=now) + + except Exception: # pylint: disable=broad-exception-caught + rv = False + if isinstance(lyrTransaction, str): + query = 'ROLLBACK TO ' + lyrTransaction + logging.exception('Exception occured within transaction: %s', query) + # don't unset lyrTransaction here as we want to RELEASE SAVEPOINT + try: + dso.ExecuteSQL(query) + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not execute SQL: %s', query) + elif isinstance(lyrTransaction, bool) and lyrTransaction: + logging.exception('Exception occured within transaction on output ' + 'layer "%s": ROLLBACK', layername) + lyrTransaction = None + try: + if lyr.RollbackTransaction() != ogr.OGRERR_NONE: + logging.error('Could not rollback transaction on layer "%s"', layername) + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not rollback transaction on layer "%s"', layername) + else: + traceback.print_exc() + + finally: + if isinstance(lyrTransaction, str): + query = 'RELEASE ' + lyrTransaction + logging.debug('%s', query) + try: + dso.ExecuteSQL(query) + except Exception: # pylint: disable=broad-exception-caught + rv = False + logging.exception('Could not execute SQL: %s', query) + elif isinstance(lyrTransaction, bool) and lyrTransaction: + try: + if lyr.CommitTransaction() != ogr.OGRERR_NONE: + rv = False + logging.error('Could not commit transaction') + except Exception: # pylint: disable=broad-exception-caught + rv = False + logging.exception('Could not commit transaction on layer "%s"', layername) + + return rv # pylint: disable-next=dangerous-default-value def _importSource(lyr : ogr.Layer, @@ -769,8 +847,9 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], logging.info('Forced conversion to %s: %s', ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches)) -def updateLayerCache(layername : str, cache : ogr.Layer, - last_updated : datetime) -> None: +def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer, + last_updated : datetime, + savepoint : str|None = None) -> None: """Update attributes in the layer cache for the given layer name.""" attributeFilter = 'layername = ' + escape_literal_str(layername) logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter) @@ -816,3 +895,7 @@ def updateLayerCache(layername : str, cache : ogr.Layer, else: if cache.CreateFeature(feature) != ogr.OGRERR_NONE: raise RuntimeError('Could not create new feature in layer cache') + + # force the PG driver to call EndCopy() to detect errors and trigger a + # rollback if needed + ds.FlushCache() diff --git a/webmap-import b/webmap-import index 0bf9c9e..1d3f4ec 100755 --- a/webmap-import +++ b/webmap-import @@ -34,7 +34,7 @@ from pathlib import Path from typing import Any, Optional, NoReturn import traceback -from osgeo import gdal, ogr +from osgeo import gdal, ogr, osr from osgeo.gdalconst import ( CE_None as GDAL_CE_None, DCAP_DEFAULT_FIELDS as GDAL_DCAP_DEFAULT_FIELDS, @@ -64,9 +64,7 @@ from import_source import ( openOutputDS, createOutputLayer, validateOutputLayer, - clearLayer, - importSources, - updateLayerCache + importSources ) def setFieldIf(cond : bool, @@ -250,6 +248,49 @@ def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]): rule['replace'] = re.compile(rule['replace']) rules[idx] = ( rule['replace'], rule['with'] ) +def processOutputLayer(ds : gdal.Dataset, + layername : str, + layerdef : dict[str,Any], + srs : Optional[osr.SpatialReference] = None, + cachedir : Path|None = None, + extent : ogr.Geometry|None = None, + dsTransaction : bool = False, + lyrcache : ogr.Layer|None = None, + force : bool = False) -> bool: + """Process an output layer, and return a boolean indicating whether + the import was sucessful (or skipped), or not.""" + + logging.info('Processing output layer "%s"', layername) + lyr = ds.GetLayerByName(layername) + if lyr is None: + raise RuntimeError(f'Failed to create output layer "{layername}"??') + if not lyr.TestCapability(ogr.OLCSequentialWrite): + raise RuntimeError(f'Output layer "{layername}" has no working ' + 'CreateFeature() method') + + sources = layerdef['sources'] + if not (lyrcache is None or force or + areSourceFilesNewer(layername, sources=sources, + lyrcache=lyrcache, + cachedir=cachedir)): + logging.info('Output layer "%s" is up to date, skipping', layername) + return True + + validateOutputLayer(lyr, srs=srs, options=layerdef['create']) + + description = layerdef.get('description', None) + if (description is not None and + lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): + logging.warning('Could not set description metadata') + + # setup output field mapping in the sources dictionary + setOutputFieldMap(lyr.GetLayerDefn(), sources) + + return importSources(dso=ds, lyr=lyr, sources=sources, + cachedir=cachedir, extent=extent, + dsoTransaction=dsTransaction, + lyrcache=lyrcache) + def validate_sources(layers : dict[str, Any]) -> None: """Mangle and validate layer sources and import definitions""" toremove = set() @@ -381,7 +422,7 @@ def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool: def areSourceFilesNewer(layername : str, sources : dict[str,Any], - cache : ogr.Layer, + lyrcache : ogr.Layer, cachedir : Optional[Path] = None) -> bool: """Return a boolean indicating whether the layer cache is up to date with respect to the source files found on disk. That is, the last modification @@ -416,10 +457,10 @@ def areSourceFilesNewer(layername : str, assert t is not None attributeFilter = 'layername = ' + escape_literal_str(layername) - logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter) - cache.SetAttributeFilter(attributeFilter) + logging.debug('SetAttributeFilter("%s", "%s")', lyrcache.GetName(), attributeFilter) + lyrcache.SetAttributeFilter(attributeFilter) - feature = cache.GetNextFeature() + feature = lyrcache.GetNextFeature() if feature is None: # not in cache return True @@ -458,7 +499,7 @@ def areSourceFilesNewer(layername : str, dt.isoformat(timespec='microseconds')) ret = int(dt.timestamp() * 1000000.) * 1000 < t - if cache.GetNextFeature() is not None: + if lyrcache.GetNextFeature() is not None: raise RuntimeError(f'Duplicate key {layername}') if not ret: @@ -512,34 +553,6 @@ def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None: for path in toremove: lock_fds.pop(path) -def commitOrRollbackTransactionIf(obj : gdal.Dataset|ogr.Layer, commit : bool) -> bool: - """Either commit or rollback the transaction, depending on a condition - indicating whether there are pending new changes to be committed.""" - if commit: - logging.debug('Committing transaction') - if obj.CommitTransaction() == ogr.OGRERR_NONE: - return True - logging.error('Could not commit transaction') - else: - logging.info('No changes detected, rolling back transaction') - # don't call rollbackTransactionTry() as we don't want to catch exceptions - if obj.RollbackTransaction() == ogr.OGRERR_NONE: - return True - logging.error('Could not rollback transaction') - return False - -def rollbackTransactionTry(obj : gdal.Dataset|ogr.Layer) -> bool: - """Try to rollback the current transaction on the dataset or layer, ignoring - any exception but logging errors and returning a boolean indicating - success.""" - try: - if obj.RollbackTransaction() == ogr.OGRERR_NONE: - return True - logging.error('Could not rollback transaction') - except Exception: # pylint: disable=broad-exception-caught - logging.exception('Could not rollback transaction') - return False - # pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements def main() -> NoReturn: common.init_logger(app=os.path.basename(__file__), level=logging.INFO) @@ -626,129 +639,52 @@ def main() -> NoReturn: # validate layer cache lyr_cache = config['dataset'].get('layercache', None) - if lyr_cache is not None and not validateCacheLayer(dso, lyr_cache): + if lyr_cache is None: + pass + elif validateCacheLayer(dso, lyr_cache): + lyr_cache = dso.GetLayerByName(lyr_cache) + else: if not args.force: logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache) args.force = True lyr_cache = None - bChanged = False rv = 0 try: for layername, layerdef in layers.items(): - logging.info('Processing output layer "%s"', layername) - lyr = dso.GetLayerByName(layername) - if lyr is None: - raise RuntimeError(f'Failed to create output layer "{layername}"??') - if not lyr.TestCapability(ogr.OLCSequentialWrite): - raise RuntimeError(f'Output layer "{layername}" has no working ' - 'CreateFeature() method') - - sources = layerdef['sources'] - if not (lyr_cache is None or args.force or - areSourceFilesNewer(layername, sources=sources, - cache=dso.GetLayerByName(lyr_cache), - cachedir=cachedir)): - logging.info('Output layer "%s" is up to date, skipping', layername) - continue - - validateOutputLayer(lyr, srs=srs, options=layerdef['create']) - - # setup output field mapping in the sources dictionary - setOutputFieldMap(lyr.GetLayerDefn(), sources) - - if dsoTransaction: - lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername) - logging.debug(lyrTransaction) - dso.ExecuteSQL(lyrTransaction) - elif lyr.TestCapability(ogr.OLCTransactions): - # start transaction if possible - logging.debug('Starting transaction') - lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE - else: - logging.warning('Unsafe update, output layer "%s" does not support transactions', - layername) - lyrTransaction = False - - try: - description = layerdef.get('description', None) - if (description is not None and - lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): - logging.warning('Could not set description metadata') - - now = datetime.now().astimezone() - clearLayer(dso, lyr) # TODO conditional (only if not new)? - importSources(lyr_dst=lyr, sources=sources, - cachedir=cachedir, extent=extent) - - # force the PG driver to call EndCopy() to detect errors and trigger a - # rollback if needed - dso.FlushCache() - - if lyr_cache is None: - bChanged0 = True - else: - updateLayerCache(layername=layername, - cache=dso.GetLayerByName(lyr_cache), - last_updated=now) - bChanged0 = True # TODO fingerprint lyr to detect changes - dso.FlushCache() - bChanged = bChanged or bChanged0 - - if isinstance(lyrTransaction, str): - if not (bChanged0 or args.force): - logging.info('No changes detected, rolling back to previous savepoint') - query = 'ROLLBACK TO ' + lyrTransaction - logging.debug(query) - try: - dso.ExecuteSQL(query) - except Exception: # pylint: disable=broad-exception-caught - logging.exception('Could not %s', query) - rv = 1 - elif isinstance(lyrTransaction, bool) and lyrTransaction: - lyrTransaction = False - if not commitOrRollbackTransactionIf(lyr, commit=bChanged0 or args.force): - rv = 1 - - except Exception: # pylint: disable=broad-exception-caught - if isinstance(lyrTransaction, str): - query = 'ROLLBACK TO ' + lyrTransaction - logging.exception('Exception occured within transaction, ' - 'rolling back to previous savepoint') - logging.debug(query) - dso.ExecuteSQL(query) - elif isinstance(lyrTransaction, bool) and lyrTransaction: - logging.exception('Exception occured within transaction, rolling back') - rollbackTransactionTry(lyr) - else: - traceback.print_exc() + if not processOutputLayer(dso, layername, layerdef, + srs=srs, + cachedir=cachedir, + extent=extent, + dsTransaction=dsoTransaction, + lyrcache=lyr_cache, + force=args.force): rv = 1 - finally: - lyr = None # close output layer - if isinstance(lyrTransaction, str): - query = 'RELEASE ' + lyrTransaction - logging.debug(query) - dso.ExecuteSQL(query) - if sourcePathLocks is not None: releaseSourcePathLocks(sourcePathLocks) if dsoTransaction: - # commit transaction dsoTransaction = False - if not commitOrRollbackTransactionIf(dso, commit=bChanged or args.force): + logging.debug('Committing transaction') + if dso.CommitTransaction() != ogr.OGRERR_NONE: + logging.error('Could not commit transaction') rv = 1 except Exception: # pylint: disable=broad-exception-caught + rv = 1 if dsoTransaction: logging.exception('Exception occured within transaction, rolling back') - rollbackTransactionTry(lyr) + try: + if dso.RollbackTransaction() != ogr.OGRERR_NONE: + logging.error('Could not rollback transaction') + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not rollback transaction') else: traceback.print_exc() - sys.exit(1) finally: + lyr_cache = None dso = None srs = None extent = None |