aboutsummaryrefslogtreecommitdiffstats
path: root/import_source.py
diff options
context:
space:
mode:
Diffstat (limited to 'import_source.py')
-rw-r--r--import_source.py103
1 files changed, 93 insertions, 10 deletions
diff --git a/import_source.py b/import_source.py
index 8ce69a9..2d2f116 100644
--- a/import_source.py
+++ b/import_source.py
@@ -27,6 +27,7 @@ from fnmatch import fnmatchcase
from pathlib import Path
from datetime import datetime, timedelta, UTC
from typing import Any, Optional
+import traceback
from osgeo import gdal, ogr, osr
from osgeo.gdalconst import (
@@ -433,16 +434,93 @@ def listArchiveMembers(namelist : list[str],
return members
# pylint: disable-next=dangerous-default-value
-def importSources(lyr_dst : ogr.Layer,
+def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
sources : dict[str,Any] = {},
cachedir : Path|None = None,
- extent : ogr.Geometry|None = None) -> None:
- """Import source layers to lyr_dst."""
- for source in sources:
- _importSource(lyr_dst, **source['source'],
- args=source['import'],
- cachedir=cachedir,
- extent=extent)
+ extent : ogr.Geometry|None = None,
+ dsoTransaction : bool = False,
+ lyrcache : ogr.Layer|None = None) -> bool:
+ """Clear lyr and import source layers to it."""
+
+ layername = lyr.GetName()
+ if dsoTransaction:
+ # declare a SAVEPOINT (nested transaction) within the DS-level transaction
+ lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername)
+ logging.debug('%s', lyrTransaction)
+ dso.ExecuteSQL(lyrTransaction)
+ elif lyr.TestCapability(ogr.OLCTransactions):
+ # try to start transaction on the layer
+ logging.debug('Starting transaction on output layer "%s"', layername)
+ lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE
+ if not lyrTransaction:
+ logging.warning('Couldn\'t start transaction on output layer "%s"', layername)
+ else:
+ logging.warning('Unsafe update, output layer "%s" doesn\'t support transactions',
+ layername)
+ lyrTransaction = False
+
+ rv = True
+ now = datetime.now().astimezone()
+ try:
+ clearLayer(dso, lyr) # TODO conditional (only if not new)?
+
+ for source in sources:
+ _importSource(lyr, **source['source'],
+ args=source['import'],
+ cachedir=cachedir,
+ extent=extent)
+ # force the PG driver to call EndCopy() to detect errors and trigger a
+ # rollback if needed
+ dso.FlushCache()
+
+ if lyrcache is not None:
+ updateLayerCache(layername=layername,
+ cache=lyrcache,
+ ds=dso,
+ savepoint=lyrTransaction if isinstance(lyrTransaction,str) else None,
+ last_updated=now)
+
+ except Exception: # pylint: disable=broad-exception-caught
+ rv = False
+ if isinstance(lyrTransaction, str):
+ query = 'ROLLBACK TO ' + lyrTransaction
+ logging.exception('Exception occured within transaction: %s', query)
+ # don't unset lyrTransaction here as we want to RELEASE SAVEPOINT
+ try:
+ dso.ExecuteSQL(query)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not execute SQL: %s', query)
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
+ logging.exception('Exception occured within transaction on output '
+ 'layer "%s": ROLLBACK', layername)
+ lyrTransaction = None
+ try:
+ if lyr.RollbackTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not rollback transaction on layer "%s"', layername)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not rollback transaction on layer "%s"', layername)
+ else:
+ traceback.print_exc()
+
+ finally:
+ if isinstance(lyrTransaction, str):
+ query = 'RELEASE ' + lyrTransaction
+ logging.debug('%s', query)
+ try:
+ dso.ExecuteSQL(query)
+ except Exception: # pylint: disable=broad-exception-caught
+ rv = False
+ logging.exception('Could not execute SQL: %s', query)
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
+ try:
+ if lyr.CommitTransaction() != ogr.OGRERR_NONE:
+ rv = False
+ logging.error('Could not commit transaction')
+ except Exception: # pylint: disable=broad-exception-caught
+ rv = False
+ logging.exception('Could not commit transaction on layer "%s"', layername)
+
+ return rv
# pylint: disable-next=dangerous-default-value
def _importSource(lyr : ogr.Layer,
@@ -769,8 +847,9 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
logging.info('Forced conversion to %s: %s',
ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches))
-def updateLayerCache(layername : str, cache : ogr.Layer,
- last_updated : datetime) -> None:
+def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer,
+ last_updated : datetime,
+ savepoint : str|None = None) -> None:
"""Update attributes in the layer cache for the given layer name."""
attributeFilter = 'layername = ' + escape_literal_str(layername)
logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
@@ -816,3 +895,7 @@ def updateLayerCache(layername : str, cache : ogr.Layer,
else:
if cache.CreateFeature(feature) != ogr.OGRERR_NONE:
raise RuntimeError('Could not create new feature in layer cache')
+
+ # force the PG driver to call EndCopy() to detect errors and trigger a
+ # rollback if needed
+ ds.FlushCache()