aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--import_source.py103
-rwxr-xr-xwebmap-import208
2 files changed, 165 insertions, 146 deletions
diff --git a/import_source.py b/import_source.py
index 8ce69a9..2d2f116 100644
--- a/import_source.py
+++ b/import_source.py
@@ -27,6 +27,7 @@ from fnmatch import fnmatchcase
from pathlib import Path
from datetime import datetime, timedelta, UTC
from typing import Any, Optional
+import traceback
from osgeo import gdal, ogr, osr
from osgeo.gdalconst import (
@@ -433,16 +434,93 @@ def listArchiveMembers(namelist : list[str],
return members
# pylint: disable-next=dangerous-default-value
-def importSources(lyr_dst : ogr.Layer,
+def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
sources : dict[str,Any] = {},
cachedir : Path|None = None,
- extent : ogr.Geometry|None = None) -> None:
- """Import source layers to lyr_dst."""
- for source in sources:
- _importSource(lyr_dst, **source['source'],
- args=source['import'],
- cachedir=cachedir,
- extent=extent)
+ extent : ogr.Geometry|None = None,
+ dsoTransaction : bool = False,
+ lyrcache : ogr.Layer|None = None) -> bool:
+ """Clear lyr and import source layers to it."""
+
+ layername = lyr.GetName()
+ if dsoTransaction:
+ # declare a SAVEPOINT (nested transaction) within the DS-level transaction
+ lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername)
+ logging.debug('%s', lyrTransaction)
+ dso.ExecuteSQL(lyrTransaction)
+ elif lyr.TestCapability(ogr.OLCTransactions):
+ # try to start transaction on the layer
+ logging.debug('Starting transaction on output layer "%s"', layername)
+ lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE
+ if not lyrTransaction:
+ logging.warning('Couldn\'t start transaction on output layer "%s"', layername)
+ else:
+ logging.warning('Unsafe update, output layer "%s" doesn\'t support transactions',
+ layername)
+ lyrTransaction = False
+
+ rv = True
+ now = datetime.now().astimezone()
+ try:
+ clearLayer(dso, lyr) # TODO conditional (only if not new)?
+
+ for source in sources:
+ _importSource(lyr, **source['source'],
+ args=source['import'],
+ cachedir=cachedir,
+ extent=extent)
+ # force the PG driver to call EndCopy() to detect errors and trigger a
+ # rollback if needed
+ dso.FlushCache()
+
+ if lyrcache is not None:
+ updateLayerCache(layername=layername,
+ cache=lyrcache,
+ ds=dso,
+ savepoint=lyrTransaction if isinstance(lyrTransaction,str) else None,
+ last_updated=now)
+
+ except Exception: # pylint: disable=broad-exception-caught
+ rv = False
+ if isinstance(lyrTransaction, str):
+ query = 'ROLLBACK TO ' + lyrTransaction
+ logging.exception('Exception occured within transaction: %s', query)
+ # don't unset lyrTransaction here as we want to RELEASE SAVEPOINT
+ try:
+ dso.ExecuteSQL(query)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not execute SQL: %s', query)
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
+ logging.exception('Exception occured within transaction on output '
+ 'layer "%s": ROLLBACK', layername)
+ lyrTransaction = None
+ try:
+ if lyr.RollbackTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not rollback transaction on layer "%s"', layername)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not rollback transaction on layer "%s"', layername)
+ else:
+ traceback.print_exc()
+
+ finally:
+ if isinstance(lyrTransaction, str):
+ query = 'RELEASE ' + lyrTransaction
+ logging.debug('%s', query)
+ try:
+ dso.ExecuteSQL(query)
+ except Exception: # pylint: disable=broad-exception-caught
+ rv = False
+ logging.exception('Could not execute SQL: %s', query)
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
+ try:
+ if lyr.CommitTransaction() != ogr.OGRERR_NONE:
+ rv = False
+ logging.error('Could not commit transaction')
+ except Exception: # pylint: disable=broad-exception-caught
+ rv = False
+ logging.exception('Could not commit transaction on layer "%s"', layername)
+
+ return rv
# pylint: disable-next=dangerous-default-value
def _importSource(lyr : ogr.Layer,
@@ -769,8 +847,9 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
logging.info('Forced conversion to %s: %s',
ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches))
-def updateLayerCache(layername : str, cache : ogr.Layer,
- last_updated : datetime) -> None:
+def updateLayerCache(ds : gdal.Dataset, layername : str, cache : ogr.Layer,
+ last_updated : datetime,
+ savepoint : str|None = None) -> None:
"""Update attributes in the layer cache for the given layer name."""
attributeFilter = 'layername = ' + escape_literal_str(layername)
logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
@@ -816,3 +895,7 @@ def updateLayerCache(layername : str, cache : ogr.Layer,
else:
if cache.CreateFeature(feature) != ogr.OGRERR_NONE:
raise RuntimeError('Could not create new feature in layer cache')
+
+ # force the PG driver to call EndCopy() to detect errors and trigger a
+ # rollback if needed
+ ds.FlushCache()
diff --git a/webmap-import b/webmap-import
index 0bf9c9e..1d3f4ec 100755
--- a/webmap-import
+++ b/webmap-import
@@ -34,7 +34,7 @@ from pathlib import Path
from typing import Any, Optional, NoReturn
import traceback
-from osgeo import gdal, ogr
+from osgeo import gdal, ogr, osr
from osgeo.gdalconst import (
CE_None as GDAL_CE_None,
DCAP_DEFAULT_FIELDS as GDAL_DCAP_DEFAULT_FIELDS,
@@ -64,9 +64,7 @@ from import_source import (
openOutputDS,
createOutputLayer,
validateOutputLayer,
- clearLayer,
- importSources,
- updateLayerCache
+ importSources
)
def setFieldIf(cond : bool,
@@ -250,6 +248,49 @@ def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]):
rule['replace'] = re.compile(rule['replace'])
rules[idx] = ( rule['replace'], rule['with'] )
+def processOutputLayer(ds : gdal.Dataset,
+ layername : str,
+ layerdef : dict[str,Any],
+ srs : Optional[osr.SpatialReference] = None,
+ cachedir : Path|None = None,
+ extent : ogr.Geometry|None = None,
+ dsTransaction : bool = False,
+ lyrcache : ogr.Layer|None = None,
+ force : bool = False) -> bool:
+ """Process an output layer, and return a boolean indicating whether
+ the import was sucessful (or skipped), or not."""
+
+ logging.info('Processing output layer "%s"', layername)
+ lyr = ds.GetLayerByName(layername)
+ if lyr is None:
+ raise RuntimeError(f'Failed to create output layer "{layername}"??')
+ if not lyr.TestCapability(ogr.OLCSequentialWrite):
+ raise RuntimeError(f'Output layer "{layername}" has no working '
+ 'CreateFeature() method')
+
+ sources = layerdef['sources']
+ if not (lyrcache is None or force or
+ areSourceFilesNewer(layername, sources=sources,
+ lyrcache=lyrcache,
+ cachedir=cachedir)):
+ logging.info('Output layer "%s" is up to date, skipping', layername)
+ return True
+
+ validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
+
+ description = layerdef.get('description', None)
+ if (description is not None and
+ lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
+ logging.warning('Could not set description metadata')
+
+ # setup output field mapping in the sources dictionary
+ setOutputFieldMap(lyr.GetLayerDefn(), sources)
+
+ return importSources(dso=ds, lyr=lyr, sources=sources,
+ cachedir=cachedir, extent=extent,
+ dsoTransaction=dsTransaction,
+ lyrcache=lyrcache)
+
def validate_sources(layers : dict[str, Any]) -> None:
"""Mangle and validate layer sources and import definitions"""
toremove = set()
@@ -381,7 +422,7 @@ def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool:
def areSourceFilesNewer(layername : str,
sources : dict[str,Any],
- cache : ogr.Layer,
+ lyrcache : ogr.Layer,
cachedir : Optional[Path] = None) -> bool:
"""Return a boolean indicating whether the layer cache is up to date with
respect to the source files found on disk. That is, the last modification
@@ -416,10 +457,10 @@ def areSourceFilesNewer(layername : str,
assert t is not None
attributeFilter = 'layername = ' + escape_literal_str(layername)
- logging.debug('SetAttributeFilter("%s", "%s")', cache.GetName(), attributeFilter)
- cache.SetAttributeFilter(attributeFilter)
+ logging.debug('SetAttributeFilter("%s", "%s")', lyrcache.GetName(), attributeFilter)
+ lyrcache.SetAttributeFilter(attributeFilter)
- feature = cache.GetNextFeature()
+ feature = lyrcache.GetNextFeature()
if feature is None:
# not in cache
return True
@@ -458,7 +499,7 @@ def areSourceFilesNewer(layername : str,
dt.isoformat(timespec='microseconds'))
ret = int(dt.timestamp() * 1000000.) * 1000 < t
- if cache.GetNextFeature() is not None:
+ if lyrcache.GetNextFeature() is not None:
raise RuntimeError(f'Duplicate key {layername}')
if not ret:
@@ -512,34 +553,6 @@ def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None:
for path in toremove:
lock_fds.pop(path)
-def commitOrRollbackTransactionIf(obj : gdal.Dataset|ogr.Layer, commit : bool) -> bool:
- """Either commit or rollback the transaction, depending on a condition
- indicating whether there are pending new changes to be committed."""
- if commit:
- logging.debug('Committing transaction')
- if obj.CommitTransaction() == ogr.OGRERR_NONE:
- return True
- logging.error('Could not commit transaction')
- else:
- logging.info('No changes detected, rolling back transaction')
- # don't call rollbackTransactionTry() as we don't want to catch exceptions
- if obj.RollbackTransaction() == ogr.OGRERR_NONE:
- return True
- logging.error('Could not rollback transaction')
- return False
-
-def rollbackTransactionTry(obj : gdal.Dataset|ogr.Layer) -> bool:
- """Try to rollback the current transaction on the dataset or layer, ignoring
- any exception but logging errors and returning a boolean indicating
- success."""
- try:
- if obj.RollbackTransaction() == ogr.OGRERR_NONE:
- return True
- logging.error('Could not rollback transaction')
- except Exception: # pylint: disable=broad-exception-caught
- logging.exception('Could not rollback transaction')
- return False
-
# pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements
def main() -> NoReturn:
common.init_logger(app=os.path.basename(__file__), level=logging.INFO)
@@ -626,129 +639,52 @@ def main() -> NoReturn:
# validate layer cache
lyr_cache = config['dataset'].get('layercache', None)
- if lyr_cache is not None and not validateCacheLayer(dso, lyr_cache):
+ if lyr_cache is None:
+ pass
+ elif validateCacheLayer(dso, lyr_cache):
+ lyr_cache = dso.GetLayerByName(lyr_cache)
+ else:
if not args.force:
logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache)
args.force = True
lyr_cache = None
- bChanged = False
rv = 0
try:
for layername, layerdef in layers.items():
- logging.info('Processing output layer "%s"', layername)
- lyr = dso.GetLayerByName(layername)
- if lyr is None:
- raise RuntimeError(f'Failed to create output layer "{layername}"??')
- if not lyr.TestCapability(ogr.OLCSequentialWrite):
- raise RuntimeError(f'Output layer "{layername}" has no working '
- 'CreateFeature() method')
-
- sources = layerdef['sources']
- if not (lyr_cache is None or args.force or
- areSourceFilesNewer(layername, sources=sources,
- cache=dso.GetLayerByName(lyr_cache),
- cachedir=cachedir)):
- logging.info('Output layer "%s" is up to date, skipping', layername)
- continue
-
- validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
-
- # setup output field mapping in the sources dictionary
- setOutputFieldMap(lyr.GetLayerDefn(), sources)
-
- if dsoTransaction:
- lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername)
- logging.debug(lyrTransaction)
- dso.ExecuteSQL(lyrTransaction)
- elif lyr.TestCapability(ogr.OLCTransactions):
- # start transaction if possible
- logging.debug('Starting transaction')
- lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE
- else:
- logging.warning('Unsafe update, output layer "%s" does not support transactions',
- layername)
- lyrTransaction = False
-
- try:
- description = layerdef.get('description', None)
- if (description is not None and
- lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
- logging.warning('Could not set description metadata')
-
- now = datetime.now().astimezone()
- clearLayer(dso, lyr) # TODO conditional (only if not new)?
- importSources(lyr_dst=lyr, sources=sources,
- cachedir=cachedir, extent=extent)
-
- # force the PG driver to call EndCopy() to detect errors and trigger a
- # rollback if needed
- dso.FlushCache()
-
- if lyr_cache is None:
- bChanged0 = True
- else:
- updateLayerCache(layername=layername,
- cache=dso.GetLayerByName(lyr_cache),
- last_updated=now)
- bChanged0 = True # TODO fingerprint lyr to detect changes
- dso.FlushCache()
- bChanged = bChanged or bChanged0
-
- if isinstance(lyrTransaction, str):
- if not (bChanged0 or args.force):
- logging.info('No changes detected, rolling back to previous savepoint')
- query = 'ROLLBACK TO ' + lyrTransaction
- logging.debug(query)
- try:
- dso.ExecuteSQL(query)
- except Exception: # pylint: disable=broad-exception-caught
- logging.exception('Could not %s', query)
- rv = 1
- elif isinstance(lyrTransaction, bool) and lyrTransaction:
- lyrTransaction = False
- if not commitOrRollbackTransactionIf(lyr, commit=bChanged0 or args.force):
- rv = 1
-
- except Exception: # pylint: disable=broad-exception-caught
- if isinstance(lyrTransaction, str):
- query = 'ROLLBACK TO ' + lyrTransaction
- logging.exception('Exception occured within transaction, '
- 'rolling back to previous savepoint')
- logging.debug(query)
- dso.ExecuteSQL(query)
- elif isinstance(lyrTransaction, bool) and lyrTransaction:
- logging.exception('Exception occured within transaction, rolling back')
- rollbackTransactionTry(lyr)
- else:
- traceback.print_exc()
+ if not processOutputLayer(dso, layername, layerdef,
+ srs=srs,
+ cachedir=cachedir,
+ extent=extent,
+ dsTransaction=dsoTransaction,
+ lyrcache=lyr_cache,
+ force=args.force):
rv = 1
- finally:
- lyr = None # close output layer
- if isinstance(lyrTransaction, str):
- query = 'RELEASE ' + lyrTransaction
- logging.debug(query)
- dso.ExecuteSQL(query)
-
if sourcePathLocks is not None:
releaseSourcePathLocks(sourcePathLocks)
if dsoTransaction:
- # commit transaction
dsoTransaction = False
- if not commitOrRollbackTransactionIf(dso, commit=bChanged or args.force):
+ logging.debug('Committing transaction')
+ if dso.CommitTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not commit transaction')
rv = 1
except Exception: # pylint: disable=broad-exception-caught
+ rv = 1
if dsoTransaction:
logging.exception('Exception occured within transaction, rolling back')
- rollbackTransactionTry(lyr)
+ try:
+ if dso.RollbackTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not rollback transaction')
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not rollback transaction')
else:
traceback.print_exc()
- sys.exit(1)
finally:
+ lyr_cache = None
dso = None
srs = None
extent = None