aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.pylintrc1
-rw-r--r--common.py157
-rwxr-xr-xwebmap-download42
-rwxr-xr-xwebmap-import713
4 files changed, 516 insertions, 397 deletions
diff --git a/.pylintrc b/.pylintrc
index 54b0100..b8d6144 100644
--- a/.pylintrc
+++ b/.pylintrc
@@ -5,3 +5,4 @@ max-locals = 50
max-branches = 25
max-statements = 100
max-nested-blocks = 10
+max-module-lines = 2000
diff --git a/common.py b/common.py
index acbb5d8..0bece11 100644
--- a/common.py
+++ b/common.py
@@ -56,6 +56,13 @@ class MissingConfiguration(Exception):
def __init__(self, name : str) -> Never:
super().__init__(f'Could not find configuration file {name}')
+class BadConfiguration(Exception):
+ """Exception raised when there is a bad configuration"""
+ def __init__(self, message : str, config_path : Optional[Path] = None) -> Never:
+ if config_path is not None:
+ message = str(config_path) + ': ' + message
+ super().__init__(message)
+
def find_config(filename : str = 'config.yml', appname : str = 'webmap') -> Path:
"""Return the configuration file path"""
dirs = [
@@ -77,8 +84,16 @@ def parse_config(path : Optional[Path] = None,
with config_path.open(mode='r', encoding='utf-8') as fp:
config = yaml.safe_load(fp)
- # filter layers that are not of interest
layers = config.get('layers', {})
+ for name, layerdefs in layers.items():
+ if isinstance(layerdefs, dict) and 'sources' not in layerdefs:
+ layers[name] = { 'sources': [layerdefs] }
+ for k in ('description', 'create', 'publish'):
+ if k in layerdefs:
+ layers[name][k] = layerdefs.pop(k)
+ layerdefs = layers[name]
+
+ # filter layers that are not of interest
if groupnames is not None:
layernames = []
layer_groups = config.get('layer-groups', {})
@@ -123,8 +138,8 @@ def parse_config(path : Optional[Path] = None,
if isinstance(extent, list):
config['extent'] = tuple(extent)
if config.get('SRS', None) is None:
- # pylint: disable-next=broad-exception-raised
- raise Exception('Configured extent without SRS')
+ raise BadConfiguration('Configured extent without SRS',
+ config_path=config_path)
return config
@@ -149,142 +164,6 @@ def format_time(ts : float, precision : int = 3) -> str:
h, m = divmod(m, 60)
return f'{h:02d}:{m:02d}:{s:0{w}.{precision}f}'
-# pylint: disable-next=invalid-name, redefined-builtin
-def gdalVersionMin(gdal, maj : int = 0, min : int = 0, rev : int = 0) -> bool:
- """Return a boolean indicating whether the installer GDAL version is
- greater than or equal to the provider (maj, min, rev) triplet."""
-
- if maj < 1 or (maj == 1 and min < 10):
- # GDAL_VERSION_NUM() macro was changed in 1.10. That version
- # was released in 2013 so we blindly assume the installer
- # version is more recent
- return True
-
- version_cur = int(gdal.VersionInfo())
- # cf. GDAL_COMPUTE_VERSION(maj,min,rev) in gcore/gdal_version.h.in
- version_min = maj*1000000 + min*10000 + rev*100
- return version_min <= version_cur
-
-# pylint: disable-next=invalid-name
-def gdalGetMetadataItem(obj, k : str) -> bool:
- """Wrapper around gdal.MajorObject.GetMetadataItem(name)."""
-
- v = obj.GetMetadataItem(k)
- if v is not None and isinstance(v, str):
- return v.upper() == 'YES'
-
- return False
-
-def escape_identifier(identifier : str) -> str:
- """Escape the given identifier, cf.
- swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id()."""
-
- if '\x00' in identifier:
- # pylint: disable-next=broad-exception-raised
- raise Exception(f'Invalid identifier "{identifier}"')
-
- # SQL:1999 delimited identifier
- return '"' + identifier.replace('"', '""') + '"'
-
-# pylint: disable-next=invalid-name,dangerous-default-value
-def gdalSetOpenExArgs(gdal, option_dict : Optional[dict[str, Any]] = {}, flags : int = 0):
- """Return a pair kwargs and driver to use with gdal.OpenEx()."""
-
- kwargs = { 'nOpenFlags': gdal.OF_VECTOR | flags }
-
- fmt = option_dict.get('format', None)
- if fmt is None:
- drv = None
- else:
- drv = gdal.GetDriverByName(fmt)
- if drv is None:
- # pylint: disable-next=broad-exception-raised
- raise Exception(f'Unknown driver name "{fmt}"')
- if not gdalGetMetadataItem(drv, gdal.DCAP_VECTOR):
- # pylint: disable-next=broad-exception-raised
- raise Exception(f'Driver "{drv.ShortName}" has no vector capabilities')
- kwargs['allowed_drivers'] = [ drv.ShortName ]
-
- oo = option_dict.get('open-options', None)
- if oo is not None:
- kwargs['open_options'] = [ k + '=' + str(v) for k, v in oo.items() ]
- return kwargs, drv
-
-# pylint: disable-next=invalid-name
-def getSRS(osr, srs_str : Optional[str]):
- """Return the decoded Spatial Reference System."""
-
- if srs_str is None:
- return None
-
- srs = osr.SpatialReference()
- if srs_str.startswith('EPSG:'):
- code = int(srs_str.removeprefix('EPSG:'))
- srs.ImportFromEPSG(code)
- else:
- # pylint: disable-next=broad-exception-raised
- raise Exception(f'Unknown SRS {srs_str}')
-
- logging.debug('Default SRS: "%s" (%s)', srs.ExportToProj4(), srs.GetName())
- return srs
-
-# pylint: disable-next=invalid-name
-def getExtent(extent : tuple[float, float, float, float] | list[float], srs = None):
- """Convert extent [minX, minY, maxX, maxY] into a polygon and assign the
- given SRS. Return a pair with the densified and non-densified extent.
- Like apps/ogr2ogr_lib.cpp, the former is obtained by segmentizing the
- polygon to make sure it is sufficiently densified when transforming to
- source layer SRS for spatial filtering."""
-
- if extent is None:
- return None, None
-
- if not isinstance(extent, (list, tuple)) or len(extent) != 4:
- # pylint: disable-next=broad-exception-raised
- raise Exception(f'Invalid extent {extent}')
- if srs is None:
- # pylint: disable-next=broad-exception-raised
- raise Exception('Configured extent but no SRS')
-
- logging.debug('Configured extent in %s: %s',
- srs.GetName(), ', '.join(map(str, extent)))
-
- from osgeo import ogr, osr # pylint: disable=import-outside-toplevel
- ogr.UseExceptions()
-
- ring = ogr.Geometry(ogr.wkbLinearRing)
- ring.AddPoint_2D(extent[0], extent[1])
- ring.AddPoint_2D(extent[2], extent[1])
- ring.AddPoint_2D(extent[2], extent[3])
- ring.AddPoint_2D(extent[0], extent[3])
- ring.AddPoint_2D(extent[0], extent[1])
-
- polygon = ogr.Geometry(ogr.wkbPolygon)
- polygon.AddGeometry(ring)
-
- # we expressed extent as minX, minY, maxX, maxY (easting/northing
- # ordered, i.e., in traditional GIS order)
- srs2 = srs.Clone()
- srs2.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER)
- polygon.AssignSpatialReference(srs2)
- if not srs2.IsSame(srs):
- polygon.TransformTo(srs)
-
- # densify the rectangle to avoid issues when reprojecting to the
- # source layer SRS, cf. apps/ogr2ogr_lib.cpp:ApplySpatialFilter()
- polygon_dense = polygon.Clone()
- segment_distance_metre = 10 * 1000
- if srs.IsGeographic():
- # pylint: disable-next=invalid-name
- dfMaxLength = segment_distance_metre / math.radians(srs.GetSemiMajor())
- polygon_dense.Segmentize(dfMaxLength)
- elif srs.IsProjected():
- # pylint: disable-next=invalid-name
- dfMaxLength = segment_distance_metre / srs.GetLinearUnits()
- polygon_dense.Segmentize(dfMaxLength)
-
- return polygon_dense, polygon
-
######
# The function definitions below are taken from cpython's source code
diff --git a/webmap-download b/webmap-download
index 8897cf4..edb624e 100755
--- a/webmap-download
+++ b/webmap-download
@@ -18,7 +18,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#----------------------------------------------------------------------
-# pylint: disable=invalid-name,missing-module-docstring
+# pylint: disable=invalid-name, missing-module-docstring, fixme
# pylint: enable=invalid-name
from os import (
@@ -49,6 +49,7 @@ from typing import Optional, NoReturn, Never
import requests
import common
+from common import BadConfiguration
def download_trystream(url : str, **kwargs) -> requests.Response:
"""GET a url, trying a number of times. Return immediately after the
@@ -170,13 +171,6 @@ def download(dest : str,
common.format_time(elapsed),
common.format_bytes(int(size/elapsed)))
-class BadConfiguration(Exception):
- """Exception raised when there is a bad configuration"""
- def __init__(self, message : str, config_path : Optional[Path] = None) -> Never:
- if config_path is not None:
- message = str(config_path) + ': ' + message
- super().__init__(message)
-
def _check_key_type(k : str, v : str, known_keys : list[type, tuple[set[str]]]) -> bool:
for t, ks in known_keys:
if k in ks and isinstance(v, t):
@@ -235,7 +229,8 @@ def parse_config_dl(downloads) -> dict[str, dict[str, str|int]]:
return destinations
-def main() -> NoReturn: # pylint: disable=missing-function-docstring
+# pylint: disable-next=missing-function-docstring
+def main() -> NoReturn:
common.init_logger(app=os_path.basename(__file__), level=logging.INFO)
parser = argparse.ArgumentParser(description='Download or update GIS layers.')
@@ -254,7 +249,7 @@ def main() -> NoReturn: # pylint: disable=missing-function-docstring
parser.add_argument('groupname', nargs='*', help='group layer name(s) to process')
args = parser.parse_args()
- if args.debug > 0:
+ if args.debug > 0: # pylint: disable=duplicate-code
logging.getLogger().setLevel(logging.DEBUG)
if args.debug > 1:
from http.client import HTTPConnection # pylint: disable=import-outside-toplevel
@@ -269,19 +264,22 @@ def main() -> NoReturn: # pylint: disable=missing-function-docstring
rv = 0
download_paths = set()
for layername, layerdef in config.get('layers', {}).items():
- source = layerdef.get('source', None)
- if source is None:
- logging.error('Layer "%s" has no source, ignoring', layername)
- rv = 1
+ sources = layerdef.get('sources', None)
+ if sources is None or len(sources) < 1:
+ logging.warning('Layer "%s" has no source, ignoring', layername)
continue
- path = source.get('path', None)
- if path is None:
- logging.error('Layer "%s" has no source path, ignoring', layername)
- rv = 1
- elif path not in downloads:
- logging.warning('Ignoring unknown source of path "%s" from layer "%s"', path, layername)
- else:
- download_paths.add(path)
+ for idx, source in enumerate(sources):
+ source = source.get('source', None)
+ path = None if source is None else source.get('path', None)
+ if path is None:
+ logging.error('Source #%d of layer "%s" has no path, ignoring',
+ idx, layername)
+ rv = 1
+ elif path not in downloads:
+ logging.warning('Ignoring unknown source of path "%s" from layer "%s"',
+ path, layername)
+ else:
+ download_paths.add(path)
if args.quiet or not sys.stderr.isatty():
pbar = None
diff --git a/webmap-import b/webmap-import
index 13bdcdc..b84d1bd 100755
--- a/webmap-import
+++ b/webmap-import
@@ -2,7 +2,7 @@
#----------------------------------------------------------------------
# Backend utilities for the Klimatanalys Norr project (extract/import layers)
-# Copyright © 2024 Guilhem Moulin <info@guilhem.se>
+# Copyright © 2024-2025 Guilhem Moulin <info@guilhem.se>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -18,8 +18,11 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#----------------------------------------------------------------------
+# pylint: disable=invalid-name, missing-module-docstring, fixme
+
from os import O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC
import os
+import sys
from fcntl import flock, LOCK_EX
import logging
import argparse
@@ -28,6 +31,8 @@ import re
import math
from fnmatch import fnmatchcase
from pathlib import Path
+from typing import Any, Optional, NoReturn
+import traceback
from osgeo import gdal, ogr, osr
from osgeo.gdalconst import (
@@ -42,17 +47,17 @@ from osgeo.gdalconst import (
DCAP_NOTNULL_FIELDS as GDAL_DCAP_NOTNULL_FIELDS,
DCAP_UNIQUE_FIELDS as GDAL_DCAP_UNIQUE_FIELDS,
)
-import osgeo.gdalconst as gdalconst
-gdal.UseExceptions()
+from osgeo import gdalconst
import common
-from common import gdalSetOpenExArgs, gdalGetMetadataItem, gdalVersionMin, escape_identifier
+from common import BadConfiguration
+
+def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset:
+ """Open and return the output DS. It is created if create=False or
+ create-options is a non-empty dictionary."""
-# Open and return the output DS. It is created if create=False or
-# create-options is a non-empty dictionary.
-def openOutputDS(def_dict):
path = def_dict['path']
- kwargs, drv = gdalSetOpenExArgs(gdal, def_dict, flags=GDAL_OF_UPDATE|GDAL_OF_VERBOSE_ERROR)
+ kwargs, drv = gdalSetOpenExArgs(def_dict, flags=GDAL_OF_UPDATE|GDAL_OF_VERBOSE_ERROR)
try:
logging.debug('OpenEx(%s, %s)', path, str(kwargs))
return gdal.OpenEx(path, **kwargs)
@@ -65,7 +70,7 @@ def openOutputDS(def_dict):
dso2 = None
try:
dso2 = gdal.OpenEx(path, nOpenFlags=GDAL_OF_ALL | GDAL_OF_UPDATE)
- except Exception:
+ except RuntimeError:
pass
if dso2 is not None:
# path exists but can't be open with OpenEx(path, **kwargs)
@@ -73,7 +78,7 @@ def openOutputDS(def_dict):
try:
dso2 = gdal.OpenEx(path, nOpenFlags=GDAL_OF_ALL)
- except Exception:
+ except RuntimeError:
pass
if dso2 is not None:
# path exists but can't be open with OpenEx(path, **kwargs)
@@ -89,20 +94,20 @@ def openOutputDS(def_dict):
if 'open_options' in kwargs:
# like ogr2ogr(1)
- logging.warning('Destination\'s open options ignored ' +
- 'when creating the output datasource')
+ logging.warning('Destination\'s open options ignored '
+ 'when creating the output datasource')
kwargs2 = { 'eType': gdal.GDT_Unknown }
if dsco is not None:
kwargs2['options'] = [ k + '=' + str(v) for k, v in dsco.items() ]
logging.debug('Create(%s, %s, eType=%s%s)', drv.ShortName, path, kwargs2['eType'],
- ', options=' + str(kwargs2['options']) if 'options' in kwargs2 else '')
+ ', options=' + str(kwargs2['options']) if 'options' in kwargs2 else '')
# XXX racy, a GDAL equivalent of O_EXCL would be nice
return drv.Create(path, 0, 0, 0, **kwargs2)
-# cf. ogr/ogrgeometry.cpp:OGRFromOGCGeomType()
-def fromGeomTypeName(name):
+def fromGeomTypeName(name : str) -> int:
+ """Parse a Geometry type name, cf. ogr/ogrgeometry.cpp:OGRFromOGCGeomType()"""
name = name.upper()
isMeasured = False
@@ -160,8 +165,8 @@ def fromGeomTypeName(name):
return eGType
-# Parse geometry type, cf. ogr2ogr_lib.cpp
-def parseGeomType(name):
+def parseGeomType(name : str|None) -> int:
+ """Parse geometry type, cf. ogr2ogr_lib.cpp"""
if name is None:
return ogr.wkbUnknown
name2 = name.upper()
@@ -181,104 +186,105 @@ def parseGeomType(name):
else:
eGType = fromGeomTypeName(name2)
if eGType == ogr.wkbUnknown:
- raise Exception(f'Unknown geometry type "{name}"')
+ raise BadConfiguration(f'Unknown geometry type "{name}"')
if eGType != ogr.wkbNone and is3D:
eGType = ogr.GT_SetZ(eGType)
return eGType
-# cf. ogr/ogr_core.h's enum OGRFieldType;
-def parseFieldType(name):
+
+def parseFieldType(name : str) -> int:
+ """Parse field type, cf. ogr/ogr_core.h's enum OGRFieldType"""
+ # pylint: disable=too-many-return-statements
if name is None:
- raise Exception('parseFieldType(None)')
+ raise RuntimeError('parseFieldType(None)')
+
name2 = name.lower()
if name2 == 'integer':
# simple 32bit integer
return ogr.OFTInteger
- elif name2 == 'integerlist':
+ if name2 == 'integerlist':
# List of 32bit integers
return ogr.OFTIntegerList
- elif name2 == 'real':
+ if name2 == 'real':
# Double Precision floating point
return ogr.OFTReal
- elif name2 == 'reallist':
+ if name2 == 'reallist':
# List of doubles
return ogr.OFTRealList
- elif name2 == 'string':
+ if name2 == 'string':
# String of ASCII chars
return ogr.OFTString
- elif name2 == 'stringlist':
+ if name2 == 'stringlist':
# Array of strings
return ogr.OFTStringList
- elif name2 == 'binary':
+ if name2 == 'binary':
# Raw Binary data
return ogr.OFTBinary
- elif name2 == 'date':
+ if name2 == 'date':
# Date
return ogr.OFTDate
- elif name2 == 'time':
+ if name2 == 'time':
# Time
return ogr.OFTTime
- elif name2 == 'datetime':
+ if name2 == 'datetime':
# Date and Time
return ogr.OFTDateTime
- elif name2 == 'integer64':
+ if name2 == 'integer64':
# Single 64bit integer
return ogr.OFTInteger64
- elif name2 == 'integer64list':
+ if name2 == 'integer64list':
# List of 64bit integers
return ogr.OFTInteger64List
- else:
- raise Exception(f'Unknown field type "{name}"')
+ raise BadConfiguration(f'Unknown field type "{name}"')
-# cf. ogr/ogr_core.h's enum OGRFieldSubType;
-def parseSubFieldType(name):
+def parseSubFieldType(name : str) -> int:
+ """Parse subfield type, cf. ogr/ogr_core.h's enum OGRFieldSubType"""
if name is None:
- raise Exception('parseSubFieldType(None)')
+ raise RuntimeError('parseSubFieldType(None)')
name2 = name.lower()
if name2 == 'none':
# No subtype. This is the default value.
return ogr.OFSTNone
- elif name2 == 'bool':
+ if name2 == 'bool':
# Boolean integer. Only valid for OFTInteger and OFTIntegerList.
return ogr.OFSTBoolean
- elif name2 == 'int16':
+ if name2 == 'int16':
# Signed 16-bit integer. Only valid for OFTInteger and OFTIntegerList.
return ogr.OFSTInt16
- elif name2 == 'float32':
+ if name2 == 'float32':
# Single precision (32 bit) floating point. Only valid for OFTReal and OFTRealList.
return ogr.OFSTFloat32
- elif name2 == 'json':
+ if name2 == 'json':
# JSON content. Only valid for OFTString.
return ogr.OFSTJSON
- elif name2 == 'uuid':
+ if name2 == 'uuid':
# UUID string representation. Only valid for OFTString.
return ogr.OFSTUUID
- else:
- raise Exception(f'Unknown field subtype "{name}"')
+ raise BadConfiguration(f'Unknown field subtype "{name}"')
-# Parse timezone
TZ_RE = re.compile(r'(?:UTC\b)?([\+\-]?)([0-9][0-9]):?([0-9][0-9])', flags=re.IGNORECASE)
-def parseTimeZone(tz):
+def parseTimeZone(tz : str) -> int:
+ """Parse timezone."""
if tz is None:
- raise Exception('parseTimeZone(None)')
+ raise RuntimeError('parseTimeZone(None)')
tz2 = tz.lower()
if tz2 == 'none':
return ogr.TZFLAG_UNKNOWN
- elif tz2 == 'local':
+ if tz2 == 'local':
return ogr.TZFLAG_LOCALTIME
- elif tz2 == 'utc' or tz2 == 'gmt':
+ if tz2 in ('utc', 'gmt'):
return ogr.TZFLAG_UTC
m = TZ_RE.fullmatch(tz)
if m is None:
- raise Exception(f'Invalid timezone "{tz}"')
+ raise BadConfiguration(f'Invalid timezone "{tz}"')
tzSign = m.group(1)
tzHour = int(m.group(2))
tzMinute = int(m.group(3))
if tzHour > 14 or tzMinute >= 60 or tzMinute % 15 != 0:
- raise Exception(f'Invalid timezone "{tz}"')
+ raise BadConfiguration(f'Invalid timezone "{tz}"')
tzFlag = tzHour*4 + int(tzMinute/15)
if tzSign == '-':
tzFlag = 100 - tzFlag
@@ -286,25 +292,31 @@ def parseTimeZone(tz):
tzFlag += 100
return tzFlag
-# Pretty-print timezone flag, cf.
-# ogr/ogrutils.cpp:OGRGetISO8601DateTime()
-def formatTZFlag(tzFlag):
+def formatTZFlag(tzFlag : int) -> str:
+ """Pretty-print timezone flag, cf. ogr/ogrutils.cpp:OGRGetISO8601DateTime()"""
if tzFlag is None:
- raise Exception('printTimeZone(None)')
+ raise RuntimeError('printTimeZone(None)')
if tzFlag == ogr.TZFLAG_UNKNOWN:
return 'none'
- elif tzFlag == ogr.TZFLAG_LOCALTIME:
+ if tzFlag == ogr.TZFLAG_LOCALTIME:
return 'local'
- elif tzFlag == ogr.TZFLAG_UTC:
+ if tzFlag == ogr.TZFLAG_UTC:
return 'UTC'
- tzOffset = abs(tzFlag - 100) * 15;
- tzHour = int(tzOffset / 60);
- tzMinute = int(tzOffset % 60);
+ tzOffset = abs(tzFlag - 100) * 15
+ tzHour = int(tzOffset / 60)
+ tzMinute = int(tzOffset % 60)
tzSign = '+' if tzFlag > 100 else '-'
return f'{tzSign}{tzHour:02}{tzMinute:02}'
-def setFieldIf(cond, attrName, val, data, fldName, drvName, log=logging.warning):
+def setFieldIf(cond : bool,
+ attrName : str,
+ val : Any,
+ data : dict[str, Any],
+ fldName : str,
+ drvName : str,
+ log = logging.warning) -> None:
+ """Conditionally set a field"""
if cond:
data[attrName] = val
else:
@@ -315,19 +327,23 @@ def setFieldIf(cond, attrName, val, data, fldName, drvName, log=logging.warning)
log('Ignoring %s=%s on field "%s" (not supported by %s driver)',
attrName, val2, fldName, drvName)
-# Validate layer creation options and schema. The schema is modified in
-# place with the parsed result.
-# (We need the driver of the output dataset to determine capability on
-# constraints.)
-def validateSchema(layers, drvo=None, lco_defaults=None):
+# pylint: disable-next=too-many-branches
+def validate_schema(layers : dict[str, Any],
+ drvo : Optional[gdal.Driver] = None,
+ lco_defaults : Optional[dict[str, str]] = None) -> None:
+ """Validate layer creation options and schema. The schema is
+ modified in place with the parsed result.
+ (We need the driver of the output dataset to determine capability on
+ constraints.)"""
+
# Cf. https://github.com/OSGeo/gdal/blob/master/NEWS.md
- if gdalVersionMin(gdal, maj=3, min=7):
+ if gdalVersionMin(maj=3, min=7):
# list of capability flags supported by the CreateField() API
drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_CREATION_FIELD_DEFN_FLAGS)
drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else []
drvoSupportsFieldComment = 'Comment' in drvoFieldDefnFlags
# GetTZFlag()/SetTZFlag() and OGR_TZFLAG_* constants added in 3.8.0
- hasTZFlagSupport = gdalVersionMin(gdal, maj=3, min=8)
+ hasTZFlagSupport = gdalVersionMin(maj=3, min=8)
else:
# list of flags supported by the OGRLayer::AlterFieldDefn() API
drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_ALTER_FIELD_DEFN_FLAGS)
@@ -338,9 +354,12 @@ def validateSchema(layers, drvo=None, lco_defaults=None):
# cache driver capabilities
drvoSupportsFieldWidthPrecision = 'WidthPrecision' in drvoFieldDefnFlags
- drvoSupportsFieldNullable = 'Nullable' in drvoFieldDefnFlags and gdalGetMetadataItem(drvo, GDAL_DCAP_NOTNULL_FIELDS)
- drvoSupportsFieldUnique = 'Unique' in drvoFieldDefnFlags and gdalGetMetadataItem(drvo, GDAL_DCAP_UNIQUE_FIELDS)
- drvoSupportsFieldDefault = 'Default' in drvoFieldDefnFlags and gdalGetMetadataItem(drvo, GDAL_DCAP_DEFAULT_FIELDS)
+ drvoSupportsFieldNullable = ('Nullable' in drvoFieldDefnFlags and
+ gdalGetMetadataItem(drvo, GDAL_DCAP_NOTNULL_FIELDS))
+ drvoSupportsFieldUnique = ('Unique' in drvoFieldDefnFlags and
+ gdalGetMetadataItem(drvo, GDAL_DCAP_UNIQUE_FIELDS))
+ drvoSupportsFieldDefault = ('Default' in drvoFieldDefnFlags and
+ gdalGetMetadataItem(drvo, GDAL_DCAP_DEFAULT_FIELDS))
drvoSupportsFieldAlternativeName = 'AlternativeName' in drvoFieldDefnFlags
for layername, layerdef in layers.items():
@@ -371,9 +390,9 @@ def validateSchema(layers, drvo=None, lco_defaults=None):
for idx, fld_def in enumerate(fields):
fld_name = fld_def.get('name', None)
if fld_name is None or fld_name == '':
- raise Exception(f'Field #{idx} has no name')
+ raise BadConfiguration(f'Field #{idx} has no name')
if fld_name in fields_set:
- raise Exception(f'Duplicate field "{fld_name}"')
+ raise BadConfiguration(f'Duplicate field "{fld_name}"')
fields_set.add(fld_name)
fld_def2 = { 'Name': fld_name }
@@ -381,7 +400,7 @@ def validateSchema(layers, drvo=None, lco_defaults=None):
k2 = k.lower()
if k2 == 'name':
pass
- elif k2 == 'alternativename' or k2 == 'alias':
+ elif k2 in ('alternativename', 'alias'):
setFieldIf(drvoSupportsFieldAlternativeName,
'AlternativeName', v, fld_def2, fld_name, drvo.ShortName,
log=logging.debug)
@@ -418,12 +437,15 @@ def validateSchema(layers, drvo=None, lco_defaults=None):
setFieldIf(drvoSupportsFieldUnique,
'Unique', v, fld_def2, fld_name, drvo.ShortName)
else:
- raise Exception(f'Field "{fld_name}" has unknown key "{k}"')
+ raise BadConfiguration(f'Field "{fld_name}" has unknown key "{k}"')
fields[idx] = fld_def2
-# Validate the output layer against the provided SRS and creation options
-def validateOutputLayer(lyr, srs=None, options=None):
+# pylint: disable-next=too-many-branches
+def validate_olyr(lyr : ogr.Layer,
+ srs : Optional[osr.SpatialReference] = None,
+ options : Optional[dict[str, Any]] = None) -> bool:
+ """Validate the output layer against the provided SRS and creation options."""
ok = True
# ensure the output SRS is equivalent
@@ -447,7 +469,7 @@ def validateOutputLayer(lyr, srs=None, options=None):
layerDefn = lyr.GetLayerDefn()
n = layerDefn.GetGeomFieldCount()
if n != 1:
- logging.warning('Output layer "%s" has %d != 1 geometry fields', layername, n)
+ logging.warning('Output layer "%s" has %d != 1 geometry fields', lyr.GetName(), n)
geom_type1 = lyr.GetGeomType()
geom_type2 = options['geometry-type']
@@ -557,8 +579,15 @@ def validateOutputLayer(lyr, srs=None, options=None):
return ok
-# Create output layer
-def createOutputLayer(ds, layername, srs=None, options=None):
+def create_olyr(ds : gdal.Dataset,
+ layername : str,
+ srs : Optional[osr.SpatialReference] = None,
+ options : dict[str, Any]|None = None) -> ogr.Layer:
+ """Create output layer."""
+
+ if options is None or len(options) < 1:
+ raise BadConfiguration(f'Missing schema for new output layer "{layername}"')
+
logging.info('Creating new destination layer "%s"', layername)
geom_type = options['geometry-type']
lco = options.get('options', None)
@@ -590,16 +619,16 @@ def createOutputLayer(ds, layername, srs=None, options=None):
ogr.GeometryTypeToName(geom_type),
', srs="' + kwargs['srs'].GetName() + '"' if 'srs' in kwargs else '',
', options=' + str(kwargs['options']) if 'options' in kwargs else '')
- lyr = dso.CreateLayer(layername, **kwargs)
+ lyr = ds.CreateLayer(layername, **kwargs)
if lyr is None:
- raise Exception(f'Could not create destination layer "{layername}"')
+ raise RuntimeError(f'Could not create destination layer "{layername}"')
# TODO use CreateLayerFromGeomFieldDefn() from ≥v3.9 as it's not
# possible to toggle the geomfield's nullable property after fact
# otherwise
fields = options['fields']
if len(fields) > 0 and not lyr.TestCapability(ogr.OLCCreateField):
- raise Exception(f'Destination layer "{layername}" lacks field creation capability')
+ raise RuntimeError(f'Destination layer "{layername}" lacks field creation capability')
# set up output schema
for fld in fields:
@@ -661,16 +690,16 @@ def createOutputLayer(ds, layername, srs=None, options=None):
defn.SetUnique(v)
if lyr.CreateField(defn, approx_ok=False) != GDAL_CE_None:
- raise Exception('Could not create field "{fldName}"')
+ raise RuntimeError('Could not create field "{fldName}"')
logging.debug('Added field "%s" to output layer "%s"', fldName, layername)
- # flush before calling StartTransaction() so we're not tryingn to
+ # flush before calling StartTransaction() so we're not trying to
# rollback changes on a non-existing table
lyr.SyncToDisk()
return lyr
-# Setup output field mapping, modifying the sources dictionary in place.
-def setOutputFieldMap(defn, sources):
+def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]):
+ """Setup output field mapping, modifying the sources dictionary in place."""
fieldMap = {}
n = defn.GetFieldCount()
for i in range(n):
@@ -692,7 +721,7 @@ def setOutputFieldMap(defn, sources):
for ifld, ofld in fieldMap2.items():
i = fieldMap.get(ofld, None)
if i is None:
- raise Exception(f'Ouput layer has no field named "{ofld}"')
+ raise RuntimeError(f'Ouput layer has no field named "{ofld}"')
fieldMap2[ifld] = i
source_import['field-map'] = fieldMap2
@@ -706,20 +735,20 @@ def setOutputFieldMap(defn, sources):
rules = [rules]
for idx, rule in enumerate(rules):
if rule is None or not isinstance(rule, dict):
- raise Exception(f'Field "{fldName}" has invalid rule #{idx}: {rule}')
+ raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}')
if 'type' not in rule:
ruleType = rule['type'] = 'literal'
else:
ruleType = rule['type']
if ('replace' not in rule or 'with' not in rule or len(rule) != 3 or
- ruleType is None or ruleType not in ['literal', 'regex']):
- raise Exception(f'Field "{fldName}" has invalid rule #{idx}: {rule}')
+ ruleType is None or ruleType not in ('literal', 'regex')):
+ raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}')
if ruleType == 'regex':
rule['replace'] = re.compile(rule['replace'])
rules[idx] = ( rule['replace'], rule['with'] )
-# Clear the given layer (wipe all its features)
-def clearLayer(ds, lyr):
+def clearLayer(ds : gdal.Dataset, lyr : ogr.Layer) -> None:
+ """Clear the given layer (wipe all its features)"""
n = -1
if lyr.TestCapability(ogr.OLCFastFeatureCount):
n = lyr.GetFeatureCount(force=0)
@@ -741,29 +770,34 @@ def clearLayer(ds, lyr):
layername_esc, str(n) if n >= 0 else 'unknown')
ds.ExecuteSQL(query.format(table=layername_esc))
-# Extract an archive file into the given destination directory.
-def extractArchive(path, destdir, fmt=None, patterns=None, exact_matches=None):
+def extractArchive(path : Path, destdir :str,
+ fmt : str|None = None,
+ patterns : list[str]|None = None,
+ exact_matches : list[str]|None = None) -> None:
+ """Extract an archive file into the given destination directory."""
if fmt is None:
suffix = path.suffix
if suffix is None or suffix == '' or not suffix.startswith('.'):
- raise Exception(f'Could not infer archive format from "{path}"')
+ raise RuntimeError(f'Could not infer archive format from "{path}"')
fmt = suffix.removeprefix('.')
fmt = fmt.lower()
logging.debug('Unpacking %s archive %s into %s', fmt, path, destdir)
if fmt == 'zip':
- from zipfile import ZipFile
+ import zipfile # pylint: disable=import-outside-toplevel
logging.debug('Opening %s as ZipFile', path)
- with ZipFile(path, mode='r') as z:
+ with zipfile.ZipFile(path, mode='r') as z:
namelist = listArchiveMembers(z.namelist(),
patterns=patterns, exact_matches=exact_matches)
z.extractall(path=destdir, members=namelist)
else:
- raise Exception(f'Unknown archive format "{fmt}"')
+ raise RuntimeError(f'Unknown archive format "{fmt}"')
-# List archive members matching the given parterns and/or exact matches
-def listArchiveMembers(namelist, patterns, exact_matches=None):
+def listArchiveMembers(namelist : list[str],
+ patterns : list[str]|None = None,
+ exact_matches : list[str]|None = None) -> list[str]:
+ """List archive members matching the given parterns and/or exact matches."""
if patterns is None and exact_matches is None:
# if neither patterns nor exact_matches are given we'll extract the entire archive
return namelist
@@ -792,40 +826,45 @@ def listArchiveMembers(namelist, patterns, exact_matches=None):
logging.debug('Ignoring archive member %s', name)
return members
-# Import a source layer
-def importSource(lyr, path=None, unar=None, args={}, cachedir=None, extent=None):
+# pylint: disable-next=dangerous-default-value
+def importSource(lyr : ogr.Layer,
+ path : str = '',
+ unar : dict[str,Any]|None = None,
+ args : dict[str,Any] = {},
+ cachedir : Path|None = None,
+ extent : ogr.Geometry|None = None) -> None:
+ """Import a source layer"""
if unar is None:
- return importSource2(lyr, str(path), args=args,
+ return importSource2(lyr, Path(path), args=args,
basedir=cachedir, extent=extent)
- if cachedir is not None:
- path = cachedir.joinpath(path)
-
ds_srcpath = Path(args['path'])
if ds_srcpath.is_absolute():
# treat absolute paths as relative to the archive root
logging.warning('%s is absolute, removing leading anchor', ds_srcpath)
ds_srcpath = ds_srcpath.relative_to(ds_srcpath.anchor)
- ds_srcpath = str(ds_srcpath)
with tempfile.TemporaryDirectory() as tmpdir:
logging.debug('Created temporary directory %s', tmpdir)
- extractArchive(path, tmpdir,
- fmt=unar.get('format', None),
- patterns=unar.get('patterns', None),
- exact_matches=[ds_srcpath])
+ extractArchive(Path(path) if cachedir is None else cachedir.joinpath(path),
+ tmpdir,
+ fmt=unar.get('format', None),
+ patterns=unar.get('patterns', None),
+ exact_matches=[str(ds_srcpath)])
return importSource2(lyr, ds_srcpath, args=args,
basedir=Path(tmpdir), extent=extent)
-# Validate field value mapping
-def setFieldMapValue(fld, idx, val):
+def setFieldMapValue(fld : ogr.FieldDefn,
+ idx : int,
+ val : None|int|str|bytes|float) -> None|int|str|bytes|float:
+ """Validate field value mapping."""
if val is None:
if not fld.IsNullable():
logging.warning('Field "%s" is not NULLable but remaps NULL', fld.GetName())
return None
fldType = fld.GetType()
- if fldType == ogr.OFTInteger or fldType == ogr.OFTInteger64:
+ if fldType in (ogr.OFTInteger, ogr.OFTInteger64):
if isinstance(val, int):
return val
elif fldType == ogr.OFTString:
@@ -837,25 +876,28 @@ def setFieldMapValue(fld, idx, val):
elif fldType == ogr.OFTReal:
if isinstance(val, int):
return float(val)
- elif isinstance(val, float):
+ if isinstance(val, float):
return val
- raise Exception(f'Field "{fld.GetName()}" mapping #{idx} has incompatible type for {ogr.GetFieldTypeName(fldType)}')
-
-# Import a source layer (already extracted)
-# This is more or less like ogr2ogr/GDALVectorTranslate() but we roll
-# out our own (slower) version because GDALVectorTranslate() insists in
-# calling StartTransaction() https://github.com/OSGeo/gdal/issues/3403
-# while we want a single transaction for the entire desination layer,
-# including truncation, source imports, and metadata changes.
-def importSource2(lyr_dst, path, args={}, basedir=None, extent=None):
- kwargs, _ = gdalSetOpenExArgs(gdal, args, flags=GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR)
+ raise RuntimeError(f'Field "{fld.GetName()}" mapping #{idx} has incompatible type '
+ f'for {ogr.GetFieldTypeName(fldType)}')
+
+# pylint: disable-next=too-many-branches, too-many-locals, too-many-statements
+def importSource2(lyr_dst : ogr.Layer, path : Path, args : dict[str,Any],
+ basedir : Path|None, extent : ogr.Geometry|None) -> None:
+ """Import a source layer (already extracted)
+ This is more or less like ogr2ogr/GDALVectorTranslate() but we roll
+ out our own (slower) version because GDALVectorTranslate() insists in
+ calling StartTransaction() https://github.com/OSGeo/gdal/issues/3403
+ while we want a single transaction for the entire desination layer,
+ including truncation, source imports, and metadata changes."""
+ kwargs, _ = gdalSetOpenExArgs(args, flags=GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR)
path2 = path if basedir is None else str(basedir.joinpath(path))
logging.debug('OpenEx(%s, %s)', path2, str(kwargs))
ds = gdal.OpenEx(path2, **kwargs)
if ds is None:
- raise Exception(f'Could not open {path2}')
+ raise RuntimeError(f'Could not open {path2}')
layername = args.get('layername', None)
if layername is None:
@@ -869,21 +911,22 @@ def importSource2(lyr_dst, path, args={}, basedir=None, extent=None):
lyr = ds.GetLayerByName(layername)
msg = '"' + layername + '"'
if lyr is None:
- raise Exception(f'Could not get requested layer {msg} from {path2}')
+ raise RuntimeError(f'Could not get requested layer {msg} from {path2}')
logging.info('Importing layer %s from "%s"', msg, path)
canIgnoreFields = lyr.TestCapability(ogr.OLCIgnoreFields)
srs = lyr.GetSpatialRef()
if srs is None:
- raise Exception('Source layer has no SRS')
+ raise RuntimeError('Source layer has no SRS')
srs_dst = lyr_dst.GetSpatialRef()
if srs_dst is None:
logging.warning('Destination has no SRS, skipping coordinate transformation')
ct = None
elif srs_dst.IsSame(srs):
- logging.debug('Both source and destination have the same SRS (%s), skipping coordinate transformation',
+ logging.debug('Both source and destination have the same SRS (%s), '
+ 'skipping coordinate transformation',
srs_dst.GetName())
ct = None
else:
@@ -893,8 +936,8 @@ def importSource2(lyr_dst, path, args={}, basedir=None, extent=None):
srs.GetName(), srs_dst.GetName())
ct = osr.CoordinateTransformation(srs, srs_dst)
if ct is None:
- raise Exception(f'Could not create transformation from source SRS ({srs.GetName()}) '
- + f'to destination SRS ({srs_dst.GetName()})')
+ raise RuntimeError(f'Could not create transformation from source SRS ({srs.GetName()}) '
+ f'to destination SRS ({srs_dst.GetName()})')
defn = lyr.GetLayerDefn()
geomFieldCount = defn.GetGeomFieldCount()
@@ -940,7 +983,8 @@ def importSource2(lyr_dst, path, args={}, basedir=None, extent=None):
else:
extent2 = extent.Clone()
if extent2.TransformTo(srs) != ogr.OGRERR_NONE:
- raise Exception(f'Could not transform extent {extent.ExportToWkt()} to {srs.GetName()}')
+ raise RuntimeError(f'Could not transform extent {extent.ExportToWkt()} '
+ f'to {srs.GetName()}')
#logging.debug('Applying extent: %s', extent2.ExportToWkt())
lyr.SetSpatialFilter(extent2)
@@ -963,7 +1007,7 @@ def importSource2(lyr_dst, path, args={}, basedir=None, extent=None):
for fldName, rules in args.get('value-map', {}).items():
i = defn.GetFieldIndex(fldName)
if i < 0:
- raise Exception(f'Source layer "{layername}" has no field named "{fldName}"')
+ raise RuntimeError(f'Source layer "{layername}" has no field named "{fldName}"')
if fieldMap[i] < 0:
logging.warning('Ignored source field "%s" has value map', fldName)
continue
@@ -973,10 +1017,12 @@ def importSource2(lyr_dst, path, args={}, basedir=None, extent=None):
mapping = []
fld = defn.GetFieldDefn(i)
for idx, (rFrom, rTo) in enumerate(rules):
- # use fld for both 'from' and 'to' (the types must match, casting is not allowed in the mapping)
+ # use fld for both 'from' and 'to' (the types must match, casting is not
+ # allowed in the mapping)
if rFrom is None:
if hasNullReplacement:
- logging.warning('Field "%s" has duplicate NULL replacement', fld.GetName())
+ logging.warning('Field "%s" has duplicate NULL replacement',
+ fld.GetName())
else:
setFieldMapValue(fld, idx, None) # validate NULL
rTo = setFieldMapValue(fld, idx, rTo)
@@ -1015,7 +1061,7 @@ def importSource2(lyr_dst, path, args={}, basedir=None, extent=None):
for i, nullReplacement, mapping in valueMap:
if not feature.IsFieldSet(i):
continue
- elif feature.IsFieldNull(i):
+ if feature.IsFieldNull(i):
if nullReplacement is not None:
# replace NULL with non-NULL value
feature.SetField(i, nullReplacement)
@@ -1033,10 +1079,10 @@ def importSource2(lyr_dst, path, args={}, basedir=None, extent=None):
m = rFrom.fullmatch(v)
if m is None:
continue
- elif rTo is not None:
+ if rTo is not None:
rTo = rTo.format(*m.groups())
else:
- raise Exception(str(rType))
+ raise RuntimeError(str(rType))
if rTo is None:
# replace non-NULL value with NULL
@@ -1053,16 +1099,17 @@ def importSource2(lyr_dst, path, args={}, basedir=None, extent=None):
geom = feature2.GetGeometryRef()
if geom is None:
if eGType_dst != ogr.wkbNone:
- logging.warning('Source feature #%d has no geometry, trying to transfer anyway', feature.GetFID())
+ logging.warning('Source feature #%d has no geometry, trying to transfer anyway',
+ feature.GetFID())
else:
if ct is not None and geom.Transform(ct) != ogr.OGRERR_NONE:
- raise Exception('Could not apply coordinate transformation')
+ raise RuntimeError('Could not apply coordinate transformation')
eGType = geom.GetGeometryType()
if eGType != eGType_dst and not dGeomIsUnknown:
# Promote to multi, cf. apps/ogr2ogr_lib.cpp:ConvertType()
eGType2 = eGType
- if eGType == ogr.wkbTriangle or eGType == ogr.wkbTIN or eGType == ogr.wkbPolyhedralSurface:
+ if eGType in (ogr.wkbTriangle, ogr.wkbTIN, ogr.wkbPolyhedralSurface):
eGType2 = ogr.wkbMultiPolygon
elif not ogr.GT_IsSubClassOf(eGType, ogr.wkbGeometryCollection):
eGType2 = ogr.GT_GetCollection(eGType)
@@ -1073,18 +1120,19 @@ def importSource2(lyr_dst, path, args={}, basedir=None, extent=None):
geom = ogr.ForceTo(geom, eGType_dst)
# TODO call MakeValid()?
else:
- raise Exception(f'Conversion from {ogr.GeometryTypeToName(eGType)} '
- f'to {ogr.GeometryTypeToName(eGType_dst)} not implemented')
+ raise RuntimeError(f'Conversion from {ogr.GeometryTypeToName(eGType)} '
+ f'to {ogr.GeometryTypeToName(eGType_dst)} not implemented')
feature2.SetGeometryDirectly(geom)
if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE:
- raise Exception(f'Could not transfer source feature #{feature.GetFID()}')
+ raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}')
featureCount += 1
feature = lyr.GetNextFeature()
if bValueMap:
- valueMapCounts = [ (lyr.GetLayerDefn().GetFieldDefn(i).GetName(), k) for i,k in enumerate(valueMapCounts) if k > 0 ]
+ valueMapCounts = [ (lyr.GetLayerDefn().GetFieldDefn(i).GetName(), k)
+ for i,k in enumerate(valueMapCounts) if k > 0 ]
lyr = None
logging.info('Imported %d features from source layer "%s"', featureCount, layername)
@@ -1103,7 +1151,173 @@ def importSource2(lyr_dst, path, args={}, basedir=None, extent=None):
ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches))
-if __name__ == '__main__':
+# pylint: disable-next=redefined-builtin
+def gdalVersionMin(maj : int = 0, min : int = 0, rev : int = 0) -> bool:
+ """Return a boolean indicating whether the installer GDAL version is
+ greater than or equal to the provider (maj, min, rev) triplet."""
+
+ if maj < 1 or (maj == 1 and min < 10):
+ # GDAL_VERSION_NUM() macro was changed in 1.10. That version
+ # was released in 2013 so we blindly assume the installer
+ # version is more recent
+ return True
+
+ version_cur = int(gdal.VersionInfo())
+ # cf. GDAL_COMPUTE_VERSION(maj,min,rev) in gcore/gdal_version.h.in
+ version_min = maj*1000000 + min*10000 + rev*100
+ return version_min <= version_cur
+
+def gdalGetMetadataItem(obj : gdal.MajorObject, k : str) -> bool:
+ """Wrapper around gdal.MajorObject.GetMetadataItem(name)."""
+
+ v = obj.GetMetadataItem(k)
+ if v is not None and isinstance(v, str):
+ return v.upper() == 'YES'
+
+ return False
+
+def escape_identifier(identifier : str) -> str:
+ """Escape the given identifier, cf.
+ swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id()."""
+
+ if '\x00' in identifier:
+ # pylint: disable-next=broad-exception-raised
+ raise Exception(f'Invalid identifier "{identifier}"')
+
+ # SQL:1999 delimited identifier
+ return '"' + identifier.replace('"', '""') + '"'
+
+# pylint: disable-next=dangerous-default-value
+def gdalSetOpenExArgs(option_dict : Optional[dict[str, Any]] = {},
+ flags : int = 0) -> tuple[dict[str, int|list[str]], gdal.Driver]:
+ """Return a pair kwargs and driver to use with gdal.OpenEx()."""
+
+ kwargs = { 'nOpenFlags': GDAL_OF_VECTOR | flags }
+
+ fmt = option_dict.get('format', None)
+ if fmt is None:
+ drv = None
+ else:
+ drv = gdal.GetDriverByName(fmt)
+ if drv is None:
+ # pylint: disable-next=broad-exception-raised
+ raise Exception(f'Unknown driver name "{fmt}"')
+ if not gdalGetMetadataItem(drv, gdal.DCAP_VECTOR):
+ # pylint: disable-next=broad-exception-raised
+ raise Exception(f'Driver "{drv.ShortName}" has no vector capabilities')
+ kwargs['allowed_drivers'] = [ drv.ShortName ]
+
+ oo = option_dict.get('open-options', None)
+ if oo is not None:
+ kwargs['open_options'] = [ k + '=' + str(v) for k, v in oo.items() ]
+ return kwargs, drv
+
+def getSRS(srs_str : Optional[str]) -> osr.SpatialReference:
+ """Return the decoded Spatial Reference System."""
+
+ if srs_str is None:
+ return None
+
+ srs = osr.SpatialReference()
+ if srs_str.startswith('EPSG:'):
+ code = int(srs_str.removeprefix('EPSG:'))
+ srs.ImportFromEPSG(code)
+ else:
+ # pylint: disable-next=broad-exception-raised
+ raise Exception(f'Unknown SRS {srs_str}')
+
+ logging.debug('Default SRS: "%s" (%s)', srs.ExportToProj4(), srs.GetName())
+ return srs
+
+def getExtent(extent : Optional[tuple[float, float, float, float]],
+ srs : Optional[osr.SpatialReference] = None) -> tuple[ogr.Geometry, ogr.Geometry]:
+ """Convert extent (minX, minY, maxX, maxY) into a polygon and assign the
+ given SRS. Return a pair with the densified and non-densified extent.
+ Like apps/ogr2ogr_lib.cpp, the former is obtained by segmentizing the
+ polygon to make sure it is sufficiently densified when transforming to
+ source layer SRS for spatial filtering."""
+
+ if extent is None:
+ return None, None
+
+ if not isinstance(extent, tuple) or len(extent) != 4:
+ # pylint: disable-next=broad-exception-raised
+ raise Exception(f'Invalid extent {extent}')
+ if srs is None:
+ # pylint: disable-next=broad-exception-raised
+ raise Exception('Configured extent but no SRS')
+
+ logging.debug('Configured extent in %s: %s', srs.GetName(),
+ ', '.join(map(str, extent)))
+
+ ring = ogr.Geometry(ogr.wkbLinearRing)
+ ring.AddPoint_2D(extent[0], extent[1])
+ ring.AddPoint_2D(extent[2], extent[1])
+ ring.AddPoint_2D(extent[2], extent[3])
+ ring.AddPoint_2D(extent[0], extent[3])
+ ring.AddPoint_2D(extent[0], extent[1])
+
+ polygon = ogr.Geometry(ogr.wkbPolygon)
+ polygon.AddGeometry(ring)
+
+ # we expressed extent as minX, minY, maxX, maxY (easting/northing
+ # ordered, i.e., in traditional GIS order)
+ srs2 = srs.Clone()
+ srs2.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER)
+ polygon.AssignSpatialReference(srs2)
+ if not srs2.IsSame(srs):
+ polygon.TransformTo(srs)
+
+ # densify the rectangle to avoid issues when reprojecting to the
+ # source layer SRS, cf. apps/ogr2ogr_lib.cpp:ApplySpatialFilter()
+ polygon_dense = polygon.Clone()
+ segment_distance_metre = 10 * 1000
+ if srs.IsGeographic():
+ dfMaxLength = segment_distance_metre / math.radians(srs.GetSemiMajor())
+ polygon_dense.Segmentize(dfMaxLength)
+ elif srs.IsProjected():
+ dfMaxLength = segment_distance_metre / srs.GetLinearUnits()
+ polygon_dense.Segmentize(dfMaxLength)
+
+ return polygon_dense, polygon
+
+def validate_sources(layers : dict[str, Any]) -> None:
+ """Mangle and validate layer sources and import definitions"""
+ toremove = set()
+ for layername, layerdefs in layers.items():
+ sources = layerdefs.get('sources', None)
+ if sources is None or len(sources) < 1:
+ logging.warning('Output layer "%s" has no definition, skipping', layername)
+ toremove.add(layername)
+ continue
+
+ for idx, layerdef in enumerate(sources):
+ importdef = layerdef.get('import', None)
+ if importdef is None:
+ raise BadConfiguration(f'Source #{idx} of output layer "{layername}" '
+ 'has no import definition')
+
+ sourcedef = layerdef.get('source', None)
+ unar = None if sourcedef is None else sourcedef.get('unar', None)
+ src = None if sourcedef is None else sourcedef.get('path', None)
+
+ ds_srcpath = importdef.get('path', None)
+ if src is None and unar is None and ds_srcpath is not None:
+ # fallback to importe:path if there is no unarchiving recipe
+ src = ds_srcpath
+ if unar is not None and ds_srcpath is None:
+ raise BadConfiguration(f'Source #{idx} of output layer "{layername}" '
+ 'has no import source path')
+ if src is None:
+ raise BadConfiguration(f'Source #{idx} of output layer "{layername}" '
+ 'has no source path')
+ layerdef['source'] = { 'path': src, 'unar': unar }
+
+ for layername in toremove:
+ layers.pop(layername)
+
+# pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements
+def main() -> NoReturn:
common.init_logger(app=os.path.basename(__file__), level=logging.INFO)
parser = argparse.ArgumentParser(description='Extract and import GIS layers.')
@@ -1112,41 +1326,23 @@ if __name__ == '__main__':
parser.add_argument('--debug', action='count', default=0,
help=argparse.SUPPRESS)
parser.add_argument('--lockfile', default=None,
- help=f'obtain an exclusive lock before starting unpacking and importing')
+ help='obtain an exclusive lock before starting unpacking and importing')
parser.add_argument('groupname', nargs='*', help='group layer name(s) to process')
args = parser.parse_args()
- if args.debug > 0:
+ if args.debug > 0: # pylint: disable=duplicate-code
logging.getLogger().setLevel(logging.DEBUG)
if args.debug > 1:
gdal.ConfigurePythonLogging(enable_debug=True)
- config = common.get_config(groupnames=None if args.groupname == [] else args.groupname)
+ config = common.parse_config(groupnames=None if args.groupname == [] else args.groupname)
# validate configuration
if 'dataset' not in config:
- raise Exception('Configuration does not specify output dataset')
+ raise BadConfiguration('Configuration does not specify output dataset')
layers = config.get('layers', {})
- for layername, layerdefs in layers.items():
- for idx, layerdef in enumerate(layerdefs['sources']):
- importdef = layerdef.get('import', None)
- if importdef is None:
- raise Exception(f'Output layer "{layername}" source #{idx} has no import definition')
-
- sourcedef = layerdef.get('source', None)
- unar = None if sourcedef is None else sourcedef.get('unar', None)
- src = None if sourcedef is None else sourcedef['cache'].get('path', None)
-
- ds_srcpath = importdef.get('path', None)
- if src is None and unar is None and ds_srcpath is not None:
- # fallback to importe:path if there is no unarchiving recipe
- src = Path(ds_srcpath)
- if unar is not None and ds_srcpath is None:
- raise Exception(f'Output layer "{layername}" source #{idx} has no import source path')
- if src is None:
- raise Exception(f'Output layer "{layername}" source #{idx} has no source path')
- layerdef['source'] = { 'path': src, 'unar': unar }
+ validate_sources(layers)
# set global GDAL/OGR configuration options
for pszKey, pszValue in config.get('GDALconfig', {}).items():
@@ -1156,13 +1352,13 @@ if __name__ == '__main__':
# open output dataset (possibly create it first)
dso = openOutputDS(config['dataset'])
- validateSchema(layers,
+ validate_schema(layers,
drvo=dso.GetDriver(),
lco_defaults=config['dataset'].get('create-layer-options', None))
# get configured Spatial Reference System and extent
- srs = common.getSRS(osr, config.get('SRS', None))
- extent = common.getExtent(config.get('extent', None), srs=srs)[0]
+ srs = getSRS(config.get('SRS', None))
+ extent = getExtent(config.get('extent', None), srs=srs)[0]
if args.lockfile is not None:
# obtain an exclusive lock and don't release it until exiting the program
@@ -1170,84 +1366,129 @@ if __name__ == '__main__':
logging.debug('flock("%s", LOCK_EX)', args.lockfile)
flock(lock_fd, LOCK_EX)
- cachedir = Path(args.cachedir) if args.cachedir is not None else None
- rv = 0
+ # create all output layers before starting the transaction
for layername, layerdef in layers.items():
- sources = layerdef['sources']
- if sources is None or len(sources) < 1:
- logging.warning('Output layer "%s" has no definition, skipping', layername)
+ lyr = dso.GetLayerByName(layername)
+ if lyr is not None:
+ # TODO dso.DeleteLayer(layername) if --overwrite and
+ # dso.TestCapability(ogr.ODsCDeleteLayer)
+ # (Sets OVERWRITE=YES for PostgreSQL and GPKG.)
continue
+ if not dso.TestCapability(ogr.ODsCCreateLayer):
+ raise RuntimeError(f'Output driver {dso.GetDriver().ShortName} does not '
+ 'support layer creation')
+ create_olyr(dso, layername, srs=srs, options=layerdef.get('create', None))
- logging.info('Processing output layer "%s"', layername)
- transaction = False
- try:
- # get output layer
- outLayerIsNotEmpty = True
- lco = layerdef.get('create', None)
- lyr = dso.GetLayerByName(layername)
- if lyr is not None:
- # TODO dso.DeleteLayer(layername) if --overwrite and dso.TestCapability(ogr.ODsCDeleteLayer)
- # (Sets OVERWRITE=YES for PostgreSQL and GPKG.)
- validateOutputLayer(lyr, srs=srs, options=lco)
- # TODO bail out if all source files are older than lyr's last_change
- elif not dso.TestCapability(ogr.ODsCCreateLayer):
- raise Exception(f'Output driver {dso.GetDriver().ShortName} does not support layer creation')
- elif lco is None or len(lco) < 1:
- raise Exception(f'Missing schema for new output layer "{layername}"')
- else:
- lyr = createOutputLayer(dso, layername, srs=srs, options=lco)
- outLayerIsNotEmpty = False
+ cachedir = Path(args.cachedir) if args.cachedir is not None else None
+ if (dso.TestCapability(ogr.ODsCTransactions) and
+ dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')):
+ logging.debug('Starting transaction')
+ dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE
+ else:
+ logging.warning('Output driver %s does not support dataset transactions or SQL SAVEPOINTs',
+ dso.GetDriver().ShortName)
+ dsoTransaction = False
+
+ rv = 0
+ try:
+ for layername, layerdef in layers.items():
+ logging.info('Processing output layer "%s"', layername)
+ lyr = dso.GetLayerByName(layername)
+ if lyr is None:
+ raise RuntimeError(f'Failed to create output layer "{layername}"??')
if not lyr.TestCapability(ogr.OLCSequentialWrite):
- raise Exception(f'Output layer "{layername}" has no working CreateFeature() method')
+ raise RuntimeError(f'Output layer "{layername}" has no working '
+ 'CreateFeature() method')
+ validate_olyr(lyr, srs=srs, options=layerdef['create'])
+
+ sources = layerdef['sources']
# setup output field mapping in the sources dictionary
setOutputFieldMap(lyr.GetLayerDefn(), sources)
- # start transaction if possible
- if lyr.TestCapability(ogr.OLCTransactions):
+ if dsoTransaction:
+ lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername)
+ logging.debug(lyrTransaction)
+ dso.ExecuteSQL(lyrTransaction)
+ elif lyr.TestCapability(ogr.OLCTransactions):
+ # start transaction if possible
logging.debug('Starting transaction')
- transaction = lyr.StartTransaction() == ogr.OGRERR_NONE
+ lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE
else:
- logging.warning('Unsafe update, output layer "%s" does not support transactions', layername)
+ logging.warning('Unsafe update, output layer "%s" does not support transactions',
+ layername)
+ lyrTransaction = False
- if outLayerIsNotEmpty:
- # clear non-empty output layer
+ try:
clearLayer(dso, lyr)
- description = layerdef.get('description', None)
- if description is not None and lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None:
- logging.warning('Could not set description metadata')
-
- for source in sources:
- # import source layers
- importSource(lyr, **source['source'], args=source['import'],
- cachedir=cachedir, extent=extent)
-
- if transaction:
- # commit transaction
- logging.debug('Committing transaction')
- transaction = False
- if lyr.CommitTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not commit transaction')
- rv = 1
-
- except Exception:
- if transaction:
- logging.error('Exception occured in transaction, rolling back')
- try:
- if lyr.RollbackTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not rollback transaction')
- except RuntimeError:
- logging.exception('Could not rollback transaction')
- logging.exception('Could not import layer "%s"', layername)
- rv = 1
-
- finally:
- # close output layer
- lyr = None
+ description = layerdef.get('description', None)
+ if (description is not None and
+ lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
+ logging.warning('Could not set description metadata')
+
+ for source in sources:
+ # import source layers
+ importSource(lyr, **source['source'], args=source['import'],
+ cachedir=cachedir, extent=extent)
+
+ if isinstance(lyrTransaction, bool) and lyrTransaction:
+ # commit transaction
+ logging.debug('Committing transaction')
+ lyrTransaction = False
+ if lyr.CommitTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not commit transaction')
+ rv = 1
+
+ except Exception: # pylint: disable=broad-exception-caught
+ if isinstance(lyrTransaction, str):
+ query = 'ROLLBACK TO ' + lyrTransaction
+ logging.exception('Exception occured in transaction, %s', query)
+ logging.debug(query)
+ dso.ExecuteSQL(query)
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
+ logging.exception('Exception occured in transaction, rolling back')
+ try:
+ if lyr.RollbackTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not rollback transaction')
+ except RuntimeError:
+ logging.exception('Could not rollback transaction')
+ else:
+ traceback.print_exc()
+ rv = 1
+
+ finally:
+ lyr = None # close output layer
+ if isinstance(lyrTransaction, str):
+ query = 'RELEASE ' + lyrTransaction
+ logging.debug(query)
+ dso.ExecuteSQL(query)
+
+ if dsoTransaction:
+ # commit transaction
+ logging.debug('Committing transaction')
+ dsoTransaction = False
+ if dso.CommitTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not commit transaction')
+ rv = 1
+
+ except Exception: # pylint: disable=broad-exception-caught
+ if dsoTransaction:
+ logging.exception('Exception occured in transaction, rolling back')
+ try:
+ if dso.RollbackTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not rollback transaction')
+ except RuntimeError:
+ logging.exception('Could not rollback transaction')
+ else:
+ traceback.print_exc()
+ rv = 1
dso = None
srs = None
extent = None
- exit(rv)
+ sys.exit(rv)
+
+gdal.UseExceptions()
+main()