diff options
Diffstat (limited to 'import_source.py')
-rw-r--r-- | import_source.py | 103 |
1 files changed, 93 insertions, 10 deletions
diff --git a/import_source.py b/import_source.py index 8ce69a9..2d2f116 100644 --- a/import_source.py +++ b/import_source.py @@ -27,6 +27,7 @@ from fnmatch import fnmatchcase from pathlib import Path from datetime import datetime, timedelta, UTC from typing import Any, Optional +import traceback from osgeo import gdal, ogr, osr from osgeo.gdalconst import ( @@ -433,16 +434,93 @@ def listArchiveMembers(namelist : list[str], return members # pylint: disable-next=dangerous-default-value -def importSources(lyr_dst : ogr.Layer, +def importSources(dso : gdal.Dataset, lyr : ogr.Layer, sources : dict[str,Any] = {}, cachedir : Path|None = None, - extent : ogr.Geometry|None = None) -> None: - """Import source layers to lyr_dst.""" - for source in sources: - _importSource(lyr_dst, **source['source'], - args=source['import'], - cachedir=cachedir, - extent=extent) + extent : ogr.Geometry|None = None, + dsoTransaction : bool = False, + lyrcache : ogr.Layer|None = None) -> bool: + """Clear lyr and import source layers to it.""" + + layername = lyr.GetName() + if dsoTransaction: + # declare a SAVEPOINT (nested transaction) within the DS-level transaction + lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername) + logging.debug('%s', lyrTransaction) + dso.ExecuteSQL(lyrTransaction) + elif lyr.TestCapability(ogr.OLCTransactions): + # try to start transaction on the layer + logging.debug('Starting transaction on output layer "%s"', layername) + lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE + if not lyrTransaction: + logging.warning('Couldn\'t start transaction on output layer "%s"', layername) + else: + logging.warning('Unsafe update, output layer "%s" doesn\'t support transactions', + layername) + lyrTransaction = False + + rv = True + now = datetime.now().astimezone() + try: + clearLayer(dso, lyr) # TODO conditional (only if not new)? + + for source in sources: + _importSource(lyr, **source['source'], + args=source['import'], + cachedir=cachedir, + extent=extent) + # force the PG driver to call EndCopy() to detect errors and trigger a + # rollback if needed + dso.FlushCache() + + if lyrcache is not None: + updateLayerCache(layername=layername, + cache=lyrcache, + ds=dso, + savepoint=lyrTransaction if isinstance(lyrTransaction,str) else None, + last_updated=now) + + except Exception: # pylint: disable=broad-exception-caught + rv = False + if isinstance(lyrTransaction, str): + query = 'ROLLBACK TO ' + lyrTransaction + logging.exception('Exception occured within transaction: %s', query) + # don't unset lyrTransaction here as we want to RELEASE SAVEPOINT + try: + dso.ExecuteSQL(query) + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not execute SQL: %s', query) + elif isinstance(lyrTransaction, bool) and lyrTransaction: + logging.exception('Exception occured within transaction on output ' + 'layer "%s": ROLLBACK', layername) + lyrTransaction = None + try: + if lyr.RollbackTransaction() != ogr.OGRERR_NONE: + logging.error('Could not rollback transaction on layer "%s"', layername) + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not rollback transaction on layer "%s"', layername) + else: + traceback.print_exc() + + finally: + if isinstance(lyrTransaction, str): + query = 'RELEASE ' + lyrTransaction + logging.debug('%s', query) + try: + dso.ExecuteSQL(query) + except Exception: # pylint: disable=broad-exception-caught + rv = False + logging.exception('Could not execute SQL: %s', query) + elif isinstance(lyrTransaction, bool) and lyrTransaction: + try: + if lyr.CommitTransaction() != ogr.OGRERR_NONE: + rv = False + logging.error('Could not commit transaction') + except Exception: # pylint: disable=broad-exception-caught + rv = False + logging.exception('Could not commit transaction on layer "%s"', layername) + + return rv # pylint: disable-next=dangerous-default-value def _importSource(lyr : ogr.Layer, @@ -769,8 +847,9 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], logging.info('Forced conversion to %s: %s', ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches)) -def updateLayerCache(layername : str, cache : ogr.Layer, - last_updated : datetime) -> None: +def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer, + last_updated : datetime, + savepoint : str|None = None) -> None: """Update attributes in the layer cache for the given layer name.""" attributeFilter = 'layername = ' + escape_literal_str(layername) logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter) @@ -816,3 +895,7 @@ def updateLayerCache(layername : str, cache : ogr.Layer, else: if cache.CreateFeature(feature) != ogr.OGRERR_NONE: raise RuntimeError('Could not create new feature in layer cache') + + # force the PG driver to call EndCopy() to detect errors and trigger a + # rollback if needed + ds.FlushCache() |