aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-04-24 11:48:46 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-04-24 18:41:48 +0200
commit31feb9d0a559d5eb268db04ca91855b959568811 (patch)
treed15238ae3de33930642470229706fb51a8ef7605
parent80882acd6ba407847fed0ef308e440b88796e0e1 (diff)
Move layer transactional logic to importSources().
It's much clearer that way. The destination layer is cleared and updated in that function, so it makes sense if that's also where transactions (or SAVEPOINTs) are committed or rollback'ed.
-rw-r--r--import_source.py103
-rwxr-xr-xwebmap-import208
2 files changed, 165 insertions, 146 deletions
diff --git a/import_source.py b/import_source.py
index 8ce69a9..2d2f116 100644
--- a/import_source.py
+++ b/import_source.py
@@ -27,6 +27,7 @@ from fnmatch import fnmatchcase
from pathlib import Path
from datetime import datetime, timedelta, UTC
from typing import Any, Optional
+import traceback
from osgeo import gdal, ogr, osr
from osgeo.gdalconst import (
@@ -433,16 +434,93 @@ def listArchiveMembers(namelist : list[str],
return members
# pylint: disable-next=dangerous-default-value
-def importSources(lyr_dst : ogr.Layer,
+def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
sources : dict[str,Any] = {},
cachedir : Path|None = None,
- extent : ogr.Geometry|None = None) -> None:
- """Import source layers to lyr_dst."""
- for source in sources:
- _importSource(lyr_dst, **source['source'],
- args=source['import'],
- cachedir=cachedir,
- extent=extent)
+ extent : ogr.Geometry|None = None,
+ dsoTransaction : bool = False,
+ lyrcache : ogr.Layer|None = None) -> bool:
+ """Clear lyr and import source layers to it."""
+
+ layername = lyr.GetName()
+ if dsoTransaction:
+ # declare a SAVEPOINT (nested transaction) within the DS-level transaction
+ lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername)
+ logging.debug('%s', lyrTransaction)
+ dso.ExecuteSQL(lyrTransaction)
+ elif lyr.TestCapability(ogr.OLCTransactions):
+ # try to start transaction on the layer
+ logging.debug('Starting transaction on output layer "%s"', layername)
+ lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE
+ if not lyrTransaction:
+ logging.warning('Couldn\'t start transaction on output layer "%s"', layername)
+ else:
+ logging.warning('Unsafe update, output layer "%s" doesn\'t support transactions',
+ layername)
+ lyrTransaction = False
+
+ rv = True
+ now = datetime.now().astimezone()
+ try:
+ clearLayer(dso, lyr) # TODO conditional (only if not new)?
+
+ for source in sources:
+ _importSource(lyr, **source['source'],
+ args=source['import'],
+ cachedir=cachedir,
+ extent=extent)
+ # force the PG driver to call EndCopy() to detect errors and trigger a
+ # rollback if needed
+ dso.FlushCache()
+
+ if lyrcache is not None:
+ updateLayerCache(layername=layername,
+ cache=lyrcache,
+ ds=dso,
+ savepoint=lyrTransaction if isinstance(lyrTransaction,str) else None,
+ last_updated=now)
+
+ except Exception: # pylint: disable=broad-exception-caught
+ rv = False
+ if isinstance(lyrTransaction, str):
+ query = 'ROLLBACK TO ' + lyrTransaction
+ logging.exception('Exception occured within transaction: %s', query)
+ # don't unset lyrTransaction here as we want to RELEASE SAVEPOINT
+ try:
+ dso.ExecuteSQL(query)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not execute SQL: %s', query)
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
+ logging.exception('Exception occured within transaction on output '
+ 'layer "%s": ROLLBACK', layername)
+ lyrTransaction = None
+ try:
+ if lyr.RollbackTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not rollback transaction on layer "%s"', layername)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not rollback transaction on layer "%s"', layername)
+ else:
+ traceback.print_exc()
+
+ finally:
+ if isinstance(lyrTransaction, str):
+ query = 'RELEASE ' + lyrTransaction
+ logging.debug('%s', query)
+ try:
+ dso.ExecuteSQL(query)
+ except Exception: # pylint: disable=broad-exception-caught
+ rv = False
+ logging.exception('Could not execute SQL: %s', query)
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
+ try:
+ if lyr.CommitTransaction() != ogr.OGRERR_NONE:
+ rv = False
+ logging.error('Could not commit transaction')
+ except Exception: # pylint: disable=broad-exception-caught
+ rv = False
+ logging.exception('Could not commit transaction on layer "%s"', layername)
+
+ return rv
# pylint: disable-next=dangerous-default-value
def _importSource(lyr : ogr.Layer,
@@ -769,8 +847,9 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
logging.info('Forced conversion to %s: %s',
ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches))
-def updateLayerCache(layername : str, cache : ogr.Layer,
- last_updated : datetime) -> None:
+def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer,
+ last_updated : datetime,
+ savepoint : str|None = None) -> None:
"""Update attributes in the layer cache for the given layer name."""
attributeFilter = 'layername = ' + escape_literal_str(layername)
logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
@@ -816,3 +895,7 @@ def updateLayerCache(layername : str, cache : ogr.Layer,
else:
if cache.CreateFeature(feature) != ogr.OGRERR_NONE:
raise RuntimeError('Could not create new feature in layer cache')
+
+ # force the PG driver to call EndCopy() to detect errors and trigger a
+ # rollback if needed
+ ds.FlushCache()
diff --git a/webmap-import b/webmap-import
index 0bf9c9e..1d3f4ec 100755
--- a/webmap-import
+++ b/webmap-import
@@ -34,7 +34,7 @@ from pathlib import Path
from typing import Any, Optional, NoReturn
import traceback
-from osgeo import gdal, ogr
+from osgeo import gdal, ogr, osr
from osgeo.gdalconst import (
CE_None as GDAL_CE_None,
DCAP_DEFAULT_FIELDS as GDAL_DCAP_DEFAULT_FIELDS,
@@ -64,9 +64,7 @@ from import_source import (
openOutputDS,
createOutputLayer,
validateOutputLayer,
- clearLayer,
- importSources,
- updateLayerCache
+ importSources
)
def setFieldIf(cond : bool,
@@ -250,6 +248,49 @@ def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]):
rule['replace'] = re.compile(rule['replace'])
rules[idx] = ( rule['replace'], rule['with'] )
+def processOutputLayer(ds : gdal.Dataset,
+ layername : str,
+ layerdef : dict[str,Any],
+ srs : Optional[osr.SpatialReference] = None,
+ cachedir : Path|None = None,
+ extent : ogr.Geometry|None = None,
+ dsTransaction : bool = False,
+ lyrcache : ogr.Layer|None = None,
+ force : bool = False) -> bool:
+ """Process an output layer, and return a boolean indicating whether
+ the import was sucessful (or skipped), or not."""
+
+ logging.info('Processing output layer "%s"', layername)
+ lyr = ds.GetLayerByName(layername)
+ if lyr is None:
+ raise RuntimeError(f'Failed to create output layer "{layername}"??')
+ if not lyr.TestCapability(ogr.OLCSequentialWrite):
+ raise RuntimeError(f'Output layer "{layername}" has no working '
+ 'CreateFeature() method')
+
+ sources = layerdef['sources']
+ if not (lyrcache is None or force or
+ areSourceFilesNewer(layername, sources=sources,
+ lyrcache=lyrcache,
+ cachedir=cachedir)):
+ logging.info('Output layer "%s" is up to date, skipping', layername)
+ return True
+
+ validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
+
+ description = layerdef.get('description', None)
+ if (description is not None and
+ lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
+ logging.warning('Could not set description metadata')
+
+ # setup output field mapping in the sources dictionary
+ setOutputFieldMap(lyr.GetLayerDefn(), sources)
+
+ return importSources(dso=ds, lyr=lyr, sources=sources,
+ cachedir=cachedir, extent=extent,
+ dsoTransaction=dsTransaction,
+ lyrcache=lyrcache)
+
def validate_sources(layers : dict[str, Any]) -> None:
"""Mangle and validate layer sources and import definitions"""
toremove = set()
@@ -381,7 +422,7 @@ def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool:
def areSourceFilesNewer(layername : str,
sources : dict[str,Any],
- cache : ogr.Layer,
+ lyrcache : ogr.Layer,
cachedir : Optional[Path] = None) -> bool:
"""Return a boolean indicating whether the layer cache is up to date with
respect to the source files found on disk. That is, the last modification
@@ -416,10 +457,10 @@ def areSourceFilesNewer(layername : str,
assert t is not None
attributeFilter = 'layername = ' + escape_literal_str(layername)
- logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
- cache.SetAttributeFilter(attributeFilter)
+ logging.debug('SetAttributeFilter("%s", "%s")', lyrcache.GetName(), attributeFilter)
+ lyrcache.SetAttributeFilter(attributeFilter)
- feature = cache.GetNextFeature()
+ feature = lyrcache.GetNextFeature()
if feature is None:
# not in cache
return True
@@ -458,7 +499,7 @@ def areSourceFilesNewer(layername : str,
dt.isoformat(timespec='microseconds'))
ret = int(dt.timestamp() * 1000000.) * 1000 < t
- if cache.GetNextFeature() is not None:
+ if lyrcache.GetNextFeature() is not None:
raise RuntimeError(f'Duplicate key {layername}')
if not ret:
@@ -512,34 +553,6 @@ def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None:
for path in toremove:
lock_fds.pop(path)
-def commitOrRollbackTransactionIf(obj : gdal.Dataset|ogr.Layer, commit : bool) -> bool:
- """Either commit or rollback the transaction, depending on a condition
- indicating whether there are pending new changes to be committed."""
- if commit:
- logging.debug('Committing transaction')
- if obj.CommitTransaction() == ogr.OGRERR_NONE:
- return True
- logging.error('Could not commit transaction')
- else:
- logging.info('No changes detected, rolling back transaction')
- # don't call rollbackTransactionTry() as we don't want to catch exceptions
- if obj.RollbackTransaction() == ogr.OGRERR_NONE:
- return True
- logging.error('Could not rollback transaction')
- return False
-
-def rollbackTransactionTry(obj : gdal.Dataset|ogr.Layer) -> bool:
- """Try to rollback the current transaction on the dataset or layer, ignoring
- any exception but logging errors and returning a boolean indicating
- success."""
- try:
- if obj.RollbackTransaction() == ogr.OGRERR_NONE:
- return True
- logging.error('Could not rollback transaction')
- except Exception: # pylint: disable=broad-exception-caught
- logging.exception('Could not rollback transaction')
- return False
-
# pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements
def main() -> NoReturn:
common.init_logger(app=os.path.basename(__file__), level=logging.INFO)
@@ -626,129 +639,52 @@ def main() -> NoReturn:
# validate layer cache
lyr_cache = config['dataset'].get('layercache', None)
- if lyr_cache is not None and not validateCacheLayer(dso, lyr_cache):
+ if lyr_cache is None:
+ pass
+ elif validateCacheLayer(dso, lyr_cache):
+ lyr_cache = dso.GetLayerByName(lyr_cache)
+ else:
if not args.force:
logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache)
args.force = True
lyr_cache = None
- bChanged = False
rv = 0
try:
for layername, layerdef in layers.items():
- logging.info('Processing output layer "%s"', layername)
- lyr = dso.GetLayerByName(layername)
- if lyr is None:
- raise RuntimeError(f'Failed to create output layer "{layername}"??')
- if not lyr.TestCapability(ogr.OLCSequentialWrite):
- raise RuntimeError(f'Output layer "{layername}" has no working '
- 'CreateFeature() method')
-
- sources = layerdef['sources']
- if not (lyr_cache is None or args.force or
- areSourceFilesNewer(layername, sources=sources,
- cache=dso.GetLayerByName(lyr_cache),
- cachedir=cachedir)):
- logging.info('Output layer "%s" is up to date, skipping', layername)
- continue
-
- validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
-
- # setup output field mapping in the sources dictionary
- setOutputFieldMap(lyr.GetLayerDefn(), sources)
-
- if dsoTransaction:
- lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername)
- logging.debug(lyrTransaction)
- dso.ExecuteSQL(lyrTransaction)
- elif lyr.TestCapability(ogr.OLCTransactions):
- # start transaction if possible
- logging.debug('Starting transaction')
- lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE
- else:
- logging.warning('Unsafe update, output layer "%s" does not support transactions',
- layername)
- lyrTransaction = False
-
- try:
- description = layerdef.get('description', None)
- if (description is not None and
- lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
- logging.warning('Could not set description metadata')
-
- now = datetime.now().astimezone()
- clearLayer(dso, lyr) # TODO conditional (only if not new)?
- importSources(lyr_dst=lyr, sources=sources,
- cachedir=cachedir, extent=extent)
-
- # force the PG driver to call EndCopy() to detect errors and trigger a
- # rollback if needed
- dso.FlushCache()
-
- if lyr_cache is None:
- bChanged0 = True
- else:
- updateLayerCache(layername=layername,
- cache=dso.GetLayerByName(lyr_cache),
- last_updated=now)
- bChanged0 = True # TODO fingerprint lyr to detect changes
- dso.FlushCache()
- bChanged = bChanged or bChanged0
-
- if isinstance(lyrTransaction, str):
- if not (bChanged0 or args.force):
- logging.info('No changes detected, rolling back to previous savepoint')
- query = 'ROLLBACK TO ' + lyrTransaction
- logging.debug(query)
- try:
- dso.ExecuteSQL(query)
- except Exception: # pylint: disable=broad-exception-caught
- logging.exception('Could not %s', query)
- rv = 1
- elif isinstance(lyrTransaction, bool) and lyrTransaction:
- lyrTransaction = False
- if not commitOrRollbackTransactionIf(lyr, commit=bChanged0 or args.force):
- rv = 1
-
- except Exception: # pylint: disable=broad-exception-caught
- if isinstance(lyrTransaction, str):
- query = 'ROLLBACK TO ' + lyrTransaction
- logging.exception('Exception occured within transaction, '
- 'rolling back to previous savepoint')
- logging.debug(query)
- dso.ExecuteSQL(query)
- elif isinstance(lyrTransaction, bool) and lyrTransaction:
- logging.exception('Exception occured within transaction, rolling back')
- rollbackTransactionTry(lyr)
- else:
- traceback.print_exc()
+ if not processOutputLayer(dso, layername, layerdef,
+ srs=srs,
+ cachedir=cachedir,
+ extent=extent,
+ dsTransaction=dsoTransaction,
+ lyrcache=lyr_cache,
+ force=args.force):
rv = 1
- finally:
- lyr = None # close output layer
- if isinstance(lyrTransaction, str):
- query = 'RELEASE ' + lyrTransaction
- logging.debug(query)
- dso.ExecuteSQL(query)
-
if sourcePathLocks is not None:
releaseSourcePathLocks(sourcePathLocks)
if dsoTransaction:
- # commit transaction
dsoTransaction = False
- if not commitOrRollbackTransactionIf(dso, commit=bChanged or args.force):
+ logging.debug('Committing transaction')
+ if dso.CommitTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not commit transaction')
rv = 1
except Exception: # pylint: disable=broad-exception-caught
+ rv = 1
if dsoTransaction:
logging.exception('Exception occured within transaction, rolling back')
- rollbackTransactionTry(lyr)
+ try:
+ if dso.RollbackTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not rollback transaction')
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not rollback transaction')
else:
traceback.print_exc()
- sys.exit(1)
finally:
+ lyr_cache = None
dso = None
srs = None
extent = None