aboutsummaryrefslogtreecommitdiffstats
path: root/import_source.py
diff options
context:
space:
mode:
Diffstat (limited to 'import_source.py')
-rw-r--r--import_source.py100
1 files changed, 70 insertions, 30 deletions
diff --git a/import_source.py b/import_source.py
index 13a8e6c..1271981 100644
--- a/import_source.py
+++ b/import_source.py
@@ -26,7 +26,7 @@ import re
from fnmatch import fnmatchcase
from pathlib import Path
from datetime import datetime, timedelta, UTC
-from typing import Any, Final, Iterator, Optional
+from typing import Any, Callable, Final, Iterator, Optional
import traceback
from enum import Enum, unique as enum_unique
from hashlib import sha256
@@ -37,6 +37,7 @@ from osgeo.gdalconst import (
OF_ALL as GDAL_OF_ALL,
OF_READONLY as GDAL_OF_READONLY,
OF_UPDATE as GDAL_OF_UPDATE,
+ OF_VECTOR as GDAL_OF_VECTOR,
OF_VERBOSE_ERROR as GDAL_OF_VERBOSE_ERROR,
DCAP_CREATE as GDAL_DCAP_CREATE,
)
@@ -46,7 +47,6 @@ from common import BadConfiguration, escape_identifier, escape_literal_str
from common_gdal import (
gdalSetOpenExArgs,
gdalGetMetadataItem,
- gdalVersionMin,
formatTZFlag,
getSpatialFilterFromGeometry,
)
@@ -56,7 +56,8 @@ def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset:
create-options is a non-empty dictionary."""
path = def_dict['path']
- kwargs, drv = gdalSetOpenExArgs(def_dict, flags=GDAL_OF_UPDATE|GDAL_OF_VERBOSE_ERROR)
+ kwargs, drv = gdalSetOpenExArgs(def_dict,
+ flags=GDAL_OF_VECTOR|GDAL_OF_UPDATE|GDAL_OF_VERBOSE_ERROR)
try:
logging.debug('OpenEx(%s, %s)', path, str(kwargs))
return gdal.OpenEx(path, **kwargs)
@@ -148,9 +149,6 @@ def createOutputLayer(ds : gdal.Dataset,
lyr = ds.CreateLayer(layername, **kwargs)
if lyr is None:
raise RuntimeError(f'Could not create destination layer "{layername}"')
- # TODO use CreateLayerFromGeomFieldDefn() from ≥v3.9 as it's not
- # possible to toggle the geomfield's nullable property after fact
- # otherwise
fields = options['fields']
if len(fields) > 0 and not lyr.TestCapability(ogr.OLCCreateField):
@@ -216,9 +214,20 @@ def createOutputLayer(ds : gdal.Dataset,
defn.SetUnique(v)
if lyr.CreateField(defn, approx_ok=False) != gdalconst.CE_None:
- raise RuntimeError('Could not create field "{fldName}"')
+ raise RuntimeError(f'Could not create field "{fldName}"')
logging.debug('Added field "%s" to output layer "%s"', fldName, layername)
+ if lyr.TestCapability(ogr.OLCAlterGeomFieldDefn):
+ # it appears using .CreateLayerFromGeomFieldDefn() on a a non-nullable
+ # GeomFieldDefn doesn't do anything, so we alter it after the fact instead
+ # (GPKG doesn't support this, use GEOMETRY_NULLABLE=NO in layer creation
+ # options instead)
+ flags = drv.GetMetadataItem(gdal.DMD_ALTER_GEOM_FIELD_DEFN_FLAGS)
+ if flags is not None and 'nullable' in flags.lower().split(' '):
+ geom_field = ogr.GeomFieldDefn(None, geom_type)
+ geom_field.SetNullable(False)
+ lyr.AlterGeomFieldDefn(0, geom_field, ogr.ALTER_GEOM_FIELD_DEFN_NULLABLE_FLAG)
+
# sync before calling StartTransaction() so we're not trying to rollback changes
# on a non-existing table
lyr.SyncToDisk()
@@ -252,15 +261,29 @@ def validateOutputLayer(lyr : ogr.Layer,
layerDefn = lyr.GetLayerDefn()
n = layerDefn.GetGeomFieldCount()
if n != 1:
+ if n == 0:
+ raise RuntimeError(f'Output layer "{lyr.GetName()}" has no geometry fields')
logging.warning('Output layer "%s" has %d != 1 geometry fields', lyr.GetName(), n)
- geom_type1 = lyr.GetGeomType()
- geom_type2 = options['geometry-type']
- if geom_type1 != geom_type2:
+ iGeomField = 0
+ geomField = layerDefn.GetGeomFieldDefn(iGeomField)
+ geomType = geomField.GetType()
+ logging.debug('Geometry column #%d: name="%s\", type="%s", srs=%s, nullable=%s',
+ iGeomField, geomField.GetName(),
+ ogr.GeometryTypeToName(geomType),
+ '-' if geomField.GetSpatialRef() is None
+ else '"' + geomField.GetSpatialRef().GetName() + '"',
+ bool(geomField.IsNullable()))
+ if geomField.IsNullable():
+ logging.warning('Geometry column #%d "%s" of output layer "%s" is nullable',
+ iGeomField, geomField.GetName(), lyr.GetName())
+
+ geomType2 = options['geometry-type']
+ if geomType != geomType2:
logging.warning('Output layer "%s" has geometry type #%d (%s), expected #%d (%s)',
lyr.GetName(),
- geom_type1, ogr.GeometryTypeToName(geom_type1),
- geom_type2, ogr.GeometryTypeToName(geom_type2))
+ geomType, ogr.GeometryTypeToName(geomType),
+ geomType2, ogr.GeometryTypeToName(geomType2))
ok = False
fields = options.get('fields', None)
@@ -362,7 +385,7 @@ def validateOutputLayer(lyr : ogr.Layer,
return ok
-def clearLayer(ds : gdal.Dataset, lyr : ogr.Layer) -> None:
+def clearLayer(lyr : ogr.Layer) -> None:
"""Clear the given layer (wipe all its features)"""
n = -1
if lyr.TestCapability(ogr.OLCFastFeatureCount):
@@ -372,7 +395,7 @@ def clearLayer(ds : gdal.Dataset, lyr : ogr.Layer) -> None:
return
layername_esc = escape_identifier(lyr.GetName())
- # XXX GDAL <3.9 doesn't have lyr.GetDataset() so we pass the DS along with the layer
+ ds = lyr.GetDataset()
drv = ds.GetDriver()
if drv.ShortName == 'PostgreSQL':
# https://www.postgresql.org/docs/15/sql-truncate.html
@@ -453,7 +476,7 @@ class ImportStatus(Enum):
return self.name.removeprefix('IMPORT_')
# pylint: disable-next=dangerous-default-value
-def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
+def importSources(lyr : ogr.Layer,
sources : dict[str,Any] = {},
cachedir : Path|None = None,
extent : ogr.Geometry|None = None,
@@ -462,6 +485,7 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
force : bool = False) -> ImportStatus:
"""Clear lyr and import source layers to it."""
+ dso = lyr.GetDataset()
layername = lyr.GetName()
if dsoTransaction:
# declare a SAVEPOINT (nested transaction) within the DS-level transaction
@@ -482,13 +506,14 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
rv = ImportStatus.IMPORT_NOCHANGE
now = datetime.now().astimezone()
try:
- clearLayer(dso, lyr) # TODO conditional (only if not new)?
+ clearLayer(lyr) # TODO conditional (only if not new)?
for source in sources:
- _importSource(lyr, **source['source'],
+ importSource0(lyr, **source['source'],
args=source['import'],
cachedir=cachedir,
- extent=extent)
+ extent=extent,
+ callback=_importSource2)
# force the PG driver to call EndCopy() to detect errors and trigger a
# rollback if needed
@@ -497,7 +522,7 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
if lyrcache is None:
rv = ImportStatus.IMPORT_SUCCESS
elif updateLayerCache(cache=lyrcache,
- ds=dso, lyr=lyr,
+ lyr=lyr,
force=force,
lyrTransaction=lyrTransaction,
last_updated=now):
@@ -550,15 +575,17 @@ def importSources(dso : gdal.Dataset, lyr : ogr.Layer,
return rv
# pylint: disable-next=dangerous-default-value
-def _importSource(lyr : ogr.Layer,
+def importSource0(lyr : ogr.Layer|None = None,
path : str = '/nonexistent',
unar : dict[str,Any]|None = None,
args : dict[str,Any] = {},
cachedir : Path|None = None,
- extent : ogr.Geometry|None = None) -> None:
+ extent : ogr.Geometry|None = None,
+ callback : Callable[[ogr.Layer|None, str, dict[str,Any], Path|None,
+ ogr.Geometry|None], None]|None = None) -> None:
"""Import a source layer"""
if unar is None:
- return _importSource2(lyr, path, args=args, basedir=cachedir, extent=extent)
+ return callback(lyr, path, args=args, basedir=cachedir, extent=extent)
ds_srcpath = Path(args['path'])
if ds_srcpath.is_absolute():
@@ -574,7 +601,7 @@ def _importSource(lyr : ogr.Layer,
fmt=unar.get('format', None),
patterns=unar.get('patterns', None),
exact_matches=[ds_srcpath])
- return _importSource2(lyr, ds_srcpath, args=args, basedir=Path(tmpdir), extent=extent)
+ return callback(lyr, ds_srcpath, args=args, basedir=Path(tmpdir), extent=extent)
def setFieldMapValue(fld : ogr.FieldDefn,
idx : int,
@@ -613,7 +640,7 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
calling StartTransaction() https://github.com/OSGeo/gdal/issues/3403
while we want a single transaction for the entire desination layer,
including truncation, source imports, and metadata changes."""
- kwargs, _ = gdalSetOpenExArgs(args, flags=GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR)
+ kwargs, _ = gdalSetOpenExArgs(args, flags=GDAL_OF_VECTOR|GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR)
path2 = path if basedir is None else str(basedir.joinpath(path))
logging.debug('OpenEx(%s, %s)', path2, str(kwargs))
@@ -758,6 +785,15 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
if nullReplacement is not None or len(mapping) > 0:
valueMap.append( (i, nullReplacement, mapping) )
+ if args.get('rstrip-strings', False):
+ stringFieldsIdx = [ i for i in range(fieldCount)
+ if defn.GetFieldDefn(i).GetType() == ogr.OFTString and
+ fieldMap[i] >= 0 ]
+ logging.debug('Source field indices to rstrip: %s', str(stringFieldsIdx))
+ bStringFields = len(stringFieldsIdx) > 0
+ else:
+ bStringFields = False
+
bValueMap = len(valueMap) > 0
defn = None
@@ -774,6 +810,12 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any],
mismatch = {}
feature = lyr.GetNextFeature()
while feature is not None:
+ if bStringFields:
+ for i in stringFieldsIdx:
+ if feature.IsFieldSetAndNotNull(i):
+ v = feature.GetField(i)
+ feature.SetField(i, v.rstrip())
+
if bValueMap:
for i, nullReplacement, mapping in valueMap:
if not feature.IsFieldSet(i):
@@ -893,7 +935,7 @@ def listFieldsOrderBy(defn : ogr.FeatureDefn,
yield c
# pylint: disable-next=too-many-branches, too-many-statements
-def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, cache : ogr.Layer,
+def updateLayerCache(lyr : ogr.Layer, cache : ogr.Layer,
last_updated : datetime,
lyrTransaction : str|bool|None = None,
force : bool = False) -> bool:
@@ -941,6 +983,7 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, cache : ogr.Layer,
struct_dgst : Final = struct.Struct('@qq').pack
logging.debug('%s', query)
+ ds = lyr.GetDataset()
lyr2 = ds.ExecuteSQL(query)
try:
assert lyr2.GetLayerDefn().GetFieldDefn(0).GetName() == 'hash_properties'
@@ -973,9 +1016,7 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, cache : ogr.Layer,
fingerprint_old = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None
assert cache.GetNextFeature() is None
- if not gdalVersionMin(maj=3, min=8):
- tzFlag = 0 # ogr.TZFLAG_UNKNOWN
- elif last_updated.tzinfo == UTC:
+ if last_updated.tzinfo == UTC:
tzFlag = ogr.TZFLAG_UTC
else:
td = last_updated.utcoffset()
@@ -1036,8 +1077,7 @@ def updateLayerCache(ds : gdal.Dataset, lyr : ogr.Layer, cache : ogr.Layer,
logging.info('Updated layer "%s" has identical fingerprint %s',
layername, fingerprint.hex()[:8])
- # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead
- if cache.SetFeature(feature) != ogr.OGRERR_NONE:
+ if cache.UpdateFeature(feature, [1,2], [], False) != ogr.OGRERR_NONE:
raise RuntimeError('Could not update feature in layer cache')
else:
if cache.CreateFeature(feature) != ogr.OGRERR_NONE: