aboutsummaryrefslogtreecommitdiffstats
path: root/import_source.py
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-04-24 11:48:46 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-04-24 18:41:48 +0200
commit31feb9d0a559d5eb268db04ca91855b959568811 (patch)
treed15238ae3de33930642470229706fb51a8ef7605 /import_source.py
parent80882acd6ba407847fed0ef308e440b88796e0e1 (diff)
Move layer transactional logic to importSources().
It's much clearer that way. The destination layer is cleared and updated in that function, so it makes sense if that's also where transactions (or SAVEPOINTs) are committed or rollback'ed.
Diffstat (limited to 'import_source.py')
-rw-r--r--import_source.py103
1 files changed, 93 insertions, 10 deletions
diff --git a/import_source.py b/import_source.py
index 8ce69a9..2d2f116 100644
--- a/import_source.py
+++ b/import_source.py
@@ -27,6 +27,7 @@ from fnmatch import fnmatchcase
from pathlib import Path
from datetime import datetime, timedelta, UTC
from typing import Any, Optional
+import traceback
from osgeo import gdal, ogr, osr
from osgeo.gdalconst import (
@@ -433,16 +434,93 @@ def listArchiveMembers(namelist : list[str],
return members
# pylint: disable-next=dangerous-default-value
-def importSources(lyr_dst : ogr.Layer,
+def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
sources : dict[str,Any] = {},
cachedir : Path|None = None,
- extent : ogr.Geometry|None = None) -> None:
- """Import source layers to lyr_dst."""
- for source in sources:
- _importSource(lyr_dst, **source['source'],
- args=source['import'],
- cachedir=cachedir,
- extent=extent)
+ extent : ogr.Geometry|None = None,
+ dsoTransaction : bool = False,
+ lyrcache : ogr.Layer|None = None) -> bool:
+ """Clear lyr and import source layers to it."""
+
+ layername = lyr.GetName()
+ if dsoTransaction:
+ # declare a SAVEPOINT (nested transaction) within the DS-level transaction
+ lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername)
+ logging.debug('%s', lyrTransaction)
+ dso.ExecuteSQL(lyrTransaction)
+ elif lyr.TestCapability(ogr.OLCTransactions):
+ # try to start transaction on the layer
+ logging.debug('Starting transaction on output layer "%s"', layername)
+ lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE
+ if not lyrTransaction:
+ logging.warning('Couldn\'t start transaction on output layer "%s"', layername)
+ else:
+ logging.warning('Unsafe update, output layer "%s" doesn\'t support transactions',
+ layername)
+ lyrTransaction = False
+
+ rv = True
+ now = datetime.now().astimezone()
+ try:
+ clearLayer(dso, lyr) # TODO conditional (only if not new)?
+
+ for source in sources:
+ _importSource(lyr, **source['source'],
+ args=source['import'],
+ cachedir=cachedir,
+ extent=extent)
+ # force the PG driver to call EndCopy() to detect errors and trigger a
+ # rollback if needed
+ dso.FlushCache()
+
+ if lyrcache is not None:
+ updateLayerCache(layername=layername,
+ cache=lyrcache,
+ ds=dso,
+ savepoint=lyrTransaction if isinstance(lyrTransaction,str) else None,
+ last_updated=now)
+
+ except Exception: # pylint: disable=broad-exception-caught
+ rv = False
+ if isinstance(lyrTransaction, str):
+ query = 'ROLLBACK TO ' + lyrTransaction
+ logging.exception('Exception occured within transaction: %s', query)
+ # don't unset lyrTransaction here as we want to RELEASE SAVEPOINT
+ try:
+ dso.ExecuteSQL(query)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not execute SQL: %s', query)
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
+ logging.exception('Exception occured within transaction on output '
+ 'layer "%s": ROLLBACK', layername)
+ lyrTransaction = None
+ try:
+ if lyr.RollbackTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not rollback transaction on layer "%s"', layername)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not rollback transaction on layer "%s"', layername)
+ else:
+ traceback.print_exc()
+
+ finally:
+ if isinstance(lyrTransaction, str):
+ query = 'RELEASE ' + lyrTransaction
+ logging.debug('%s', query)
+ try:
+ dso.ExecuteSQL(query)
+ except Exception: # pylint: disable=broad-exception-caught
+ rv = False
+ logging.exception('Could not execute SQL: %s', query)
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
+ try:
+ if lyr.CommitTransaction() != ogr.OGRERR_NONE:
+ rv = False
+ logging.error('Could not commit transaction')
+ except Exception: # pylint: disable=broad-exception-caught
+ rv = False
+ logging.exception('Could not commit transaction on layer "%s"', layername)
+
+ return rv
# pylint: disable-next=dangerous-default-value
def _importSource(lyr : ogr.Layer,
@@ -769,8 +847,9 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
logging.info('Forced conversion to %s: %s',
ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches))
-def updateLayerCache(layername : str, cache : ogr.Layer,
- last_updated : datetime) -> None:
+def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer,
+ last_updated : datetime,
+ savepoint : str|None = None) -> None:
"""Update attributes in the layer cache for the given layer name."""
attributeFilter = 'layername = ' + escape_literal_str(layername)
logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
@@ -816,3 +895,7 @@ def updateLayerCache(layername : str, cache : ogr.Layer,
else:
if cache.CreateFeature(feature) != ogr.OGRERR_NONE:
raise RuntimeError('Could not create new feature in layer cache')
+
+ # force the PG driver to call EndCopy() to detect errors and trigger a
+ # rollback if needed
+ ds.FlushCache()