aboutsummaryrefslogtreecommitdiffstats
path: root/webmap-import
diff options
context:
space:
mode:
Diffstat (limited to 'webmap-import')
-rwxr-xr-xwebmap-import1658
1 files changed, 557 insertions, 1101 deletions
diff --git a/webmap-import b/webmap-import
index c1d07a3..e5a1426 100755
--- a/webmap-import
+++ b/webmap-import
@@ -2,7 +2,7 @@
#----------------------------------------------------------------------
# Backend utilities for the Klimatanalys Norr project (extract/import layers)
-# Copyright © 2024 Guilhem Moulin <info@guilhem.se>
+# Copyright © 2024-2025 Guilhem Moulin <info@guilhem.se>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -18,319 +18,67 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#----------------------------------------------------------------------
+# pylint: disable=invalid-name, missing-module-docstring, fixme
+
+from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY
import os
+from stat import S_ISREG
+import sys
+from fcntl import flock, LOCK_EX, LOCK_SH
import logging
import argparse
-import tempfile
import re
-import math
-from fnmatch import fnmatchcase
+from datetime import datetime, timedelta, timezone, UTC
+from math import modf
from pathlib import Path
+from time import monotonic as time_monotonic
+from typing import Any, Optional, NoReturn
+import traceback
from osgeo import gdal, ogr, osr
from osgeo.gdalconst import (
- OF_VECTOR as GDAL_OF_VECTOR,
- OF_ALL as GDAL_OF_ALL,
- OF_READONLY as GDAL_OF_READONLY,
- OF_UPDATE as GDAL_OF_UPDATE,
- OF_VERBOSE_ERROR as GDAL_OF_VERBOSE_ERROR,
CE_None as GDAL_CE_None,
- DCAP_CREATE as GDAL_DCAP_CREATE,
- DCAP_VECTOR as GDAL_DCAP_VECTOR,
DCAP_DEFAULT_FIELDS as GDAL_DCAP_DEFAULT_FIELDS,
DCAP_NOTNULL_FIELDS as GDAL_DCAP_NOTNULL_FIELDS,
DCAP_UNIQUE_FIELDS as GDAL_DCAP_UNIQUE_FIELDS,
)
-import osgeo.gdalconst as gdalconst
-gdal.UseExceptions()
+from osgeo import gdalconst
import common
-
-# Wrapper around gdal.MajorObject.GetMetadataItem(name)
-def getMetadataItem(o, k):
- v = o.GetMetadataItem(k)
- if v is not None and isinstance(v, str):
- return v.upper() == 'YES'
- else:
- return False
-
-# Return kwargs and driver for OpenEx()
-def setOpenExArgs(option_dict, flags=0):
- kwargs = { 'nOpenFlags': GDAL_OF_VECTOR | flags }
-
- fmt = option_dict.get('format', None)
- if fmt is None:
- drv = None
- else:
- drv = gdal.GetDriverByName(fmt)
- if drv is None:
- raise Exception(f'Unknown driver name "{fmt}"')
- elif not getMetadataItem(drv, GDAL_DCAP_VECTOR):
- raise Exception(f'Driver "{drv.ShortName}" has no vector capabilities')
- kwargs['allowed_drivers'] = [ drv.ShortName ]
-
- oo = option_dict.get('open-options', None)
- if oo is not None:
- kwargs['open_options'] = [ k + '=' + str(v) for k, v in oo.items() ]
- return kwargs, drv
-
-# Open and return the output DS. It is created if create=False or
-# create-options is a non-empty dictionary.
-def openOutputDS(def_dict):
- path = def_dict['path']
- kwargs, drv = setOpenExArgs(def_dict, flags=GDAL_OF_UPDATE|GDAL_OF_VERBOSE_ERROR)
- try:
- logging.debug('OpenEx(%s, %s)', path, str(kwargs))
- return gdal.OpenEx(path, **kwargs)
- except RuntimeError as e:
- if not (gdal.GetLastErrorType() >= gdalconst.CE_Failure and
- gdal.GetLastErrorNo() == gdalconst.CPLE_OpenFailed):
- # not an open error
- raise e
-
- dso2 = None
- try:
- dso2 = gdal.OpenEx(path, nOpenFlags=GDAL_OF_ALL | GDAL_OF_UPDATE)
- except Exception:
- pass
- if dso2 is not None:
- # path exists but can't be open with OpenEx(path, **kwargs)
- raise e
-
- try:
- dso2 = gdal.OpenEx(path, nOpenFlags=GDAL_OF_ALL)
- except Exception:
- pass
- if dso2 is not None:
- # path exists but can't be open with OpenEx(path, **kwargs)
- raise e
-
- dsco = def_dict.get('create-options', None)
- if not def_dict.get('create', False) and dsco is None:
- # not configured for creation
- raise e
- if drv is None or not getMetadataItem(drv, GDAL_DCAP_CREATE):
- # not capable of creation
- raise e
-
- if 'open_options' in kwargs:
- # like ogr2ogr(1)
- logging.warning('Destination\'s open options ignored ' +
- 'when creating the output datasource')
-
- kwargs2 = { 'eType': gdal.GDT_Unknown }
- if dsco is not None:
- kwargs2['options'] = [ k + '=' + str(v) for k, v in dsco.items() ]
-
- logging.debug('Create(%s, %s, eType=%s%s)', drv.ShortName, path, kwargs2['eType'],
- ', options=' + str(kwargs2['options']) if 'options' in kwargs2 else '')
- # XXX racy, a GDAL equivalent of O_EXCL would be nice
- return drv.Create(path, 0, 0, 0, **kwargs2)
-
-# cf. ogr/ogrgeometry.cpp:OGRFromOGCGeomType()
-def fromGeomTypeName(name):
- name = name.upper()
-
- isMeasured = False
- if name.endswith('M'):
- isMeasured = True
- name = name.removesuffix('M')
-
- convertTo3D = False
- if name.endswith('Z'):
- convertTo3D = True
- name = name.removesuffix('Z')
-
- if name == 'POINT':
- eGType = ogr.wkbPoint
- elif name == 'LINESTRING':
- eGType = ogr.wkbLineString
- elif name == 'POLYGON':
- eGType = ogr.wkbPolygon
- elif name == 'MULTIPOINT':
- eGType = ogr.wkbMultiPoint
- elif name == 'MULTILINESTRING':
- eGType = ogr.wkbMultiLineString
- elif name == 'MULTIPOLYGON':
- eGType = ogr.wkbMultiPolygon
- elif name == 'GEOMETRYCOLLECTION':
- eGType = ogr.wkbGeometryCollection
- elif name == 'CIRCULARSTRING':
- eGType = ogr.wkbCircularString
- elif name == 'COMPOUNDCURVE':
- eGType = ogr.wkbCompoundCurve
- elif name == 'CURVEPOLYGON':
- eGType = ogr.wkbCurvePolygon
- elif name == 'MULTICURVE':
- eGType = ogr.wkbMultiCurve
- elif name == 'MULTISURFACE':
- eGType = ogr.wkbMultiSurface
- elif name == 'TRIANGLE':
- eGType = ogr.wkbTriangle
- elif name == 'POLYHEDRALSURFACE':
- eGType = ogr.wkbPolyhedralSurface
- elif name == 'TIN':
- eGType = ogr.wkbTIN
- elif name == 'CURVE':
- eGType = ogr.wkbCurve
- elif name == 'SURFACE':
- eGType = ogr.wkbSurface
- else:
- eGType = ogr.wkbUnknown
-
- if convertTo3D:
- eGType = ogr.GT_SetZ(eGType)
-
- if isMeasured:
- eGType = ogr.GT_SetM(eGType)
-
- return eGType
-
-# Parse geometry type, cf. ogr2ogr_lib.cpp
-def parseGeomType(name):
- if name is None:
- return ogr.wkbUnknown
- name2 = name.upper()
-
- is3D = False
- if name2.endswith('25D'):
- name2 = name2[:-3] # alias
- is3D = True
- elif name2.endswith('Z'):
- name2 = name2[:-1]
- is3D = True
-
- if name2 == 'NONE':
- eGType = ogr.wkbNone
- elif name2 == 'GEOMETRY':
- eGType = ogr.wkbUnknown
- else:
- eGType = fromGeomTypeName(name2)
- if eGType == ogr.wkbUnknown:
- raise Exception(f'Unknown geometry type "{name}"')
-
- if eGType != ogr.wkbNone and is3D:
- eGType = ogr.GT_SetZ(eGType)
-
- return eGType
-
-# cf. ogr/ogr_core.h's enum OGRFieldType;
-def parseFieldType(name):
- if name is None:
- raise Exception('parseFieldType(None)')
- name2 = name.lower()
- if name2 == 'integer':
- # simple 32bit integer
- return ogr.OFTInteger
- elif name2 == 'integerlist':
- # List of 32bit integers
- return ogr.OFTIntegerList
- elif name2 == 'real':
- # Double Precision floating point
- return ogr.OFTReal
- elif name2 == 'reallist':
- # List of doubles
- return ogr.OFTRealList
- elif name2 == 'string':
- # String of ASCII chars
- return ogr.OFTString
- elif name2 == 'stringlist':
- # Array of strings
- return ogr.OFTStringList
- elif name2 == 'binary':
- # Raw Binary data
- return ogr.OFTBinary
- elif name2 == 'date':
- # Date
- return ogr.OFTDate
- elif name2 == 'time':
- # Time
- return ogr.OFTTime
- elif name2 == 'datetime':
- # Date and Time
- return ogr.OFTDateTime
- elif name2 == 'integer64':
- # Single 64bit integer
- return ogr.OFTInteger64
- elif name2 == 'integer64list':
- # List of 64bit integers
- return ogr.OFTInteger64List
- else:
- raise Exception(f'Unknown field type "{name}"')
-
-# cf. ogr/ogr_core.h's enum OGRFieldSubType;
-def parseSubFieldType(name):
- if name is None:
- raise Exception('parseSubFieldType(None)')
- name2 = name.lower()
- if name2 == 'none':
- # No subtype. This is the default value.
- return ogr.OFSTNone
- elif name2 == 'bool':
- # Boolean integer. Only valid for OFTInteger and OFTIntegerList.
- return ogr.OFSTBoolean
- elif name2 == 'int16':
- # Signed 16-bit integer. Only valid for OFTInteger and OFTIntegerList.
- return ogr.OFSTInt16
- elif name2 == 'float32':
- # Single precision (32 bit) floating point. Only valid for OFTReal and OFTRealList.
- return ogr.OFSTFloat32
- elif name2 == 'json':
- # JSON content. Only valid for OFTString.
- return ogr.OFSTJSON
- elif name2 == 'uuid':
- # UUID string representation. Only valid for OFTString.
- return ogr.OFSTUUID
- else:
- raise Exception(f'Unknown field subtype "{name}"')
-
-# Parse timezone
-TZ_RE = re.compile(r'(?:UTC\b)?([\+\-]?)([0-9][0-9]):?([0-9][0-9])', flags=re.IGNORECASE)
-def parseTimeZone(tz):
- if tz is None:
- raise Exception('parseTimeZone(None)')
- tz2 = tz.lower()
- if tz2 == 'none':
- return ogr.TZFLAG_UNKNOWN
- elif tz2 == 'local':
- return ogr.TZFLAG_LOCALTIME
- elif tz2 == 'utc' or tz2 == 'gmt':
- return ogr.TZFLAG_UTC
-
- m = TZ_RE.fullmatch(tz)
- if m is None:
- raise Exception(f'Invalid timezone "{tz}"')
- tzSign = m.group(1)
- tzHour = int(m.group(2))
- tzMinute = int(m.group(3))
- if tzHour > 14 or tzMinute >= 60 or tzMinute % 15 != 0:
- raise Exception(f'Invalid timezone "{tz}"')
- tzFlag = tzHour*4 + int(tzMinute/15)
- if tzSign == '-':
- tzFlag = 100 - tzFlag
- else:
- tzFlag += 100
- return tzFlag
-
-# Pretty-print timezone flag, cf.
-# ogr/ogrutils.cpp:OGRGetISO8601DateTime()
-def formatTZFlag(tzFlag):
- if tzFlag is None:
- raise Exception('printTimeZone(None)')
- if tzFlag == ogr.TZFLAG_UNKNOWN:
- return 'none'
- elif tzFlag == ogr.TZFLAG_LOCALTIME:
- return 'local'
- elif tzFlag == ogr.TZFLAG_UTC:
- return 'UTC'
-
- tzOffset = abs(tzFlag - 100) * 15;
- tzHour = int(tzOffset / 60);
- tzMinute = int(tzOffset % 60);
- tzSign = '+' if tzFlag > 100 else '-'
- return f'{tzSign}{tzHour:02}{tzMinute:02}'
-
-def setFieldIf(cond, attrName, val, data, fldName, drvName, log=logging.warning):
+from common import (
+ BadConfiguration,
+ parse_config_dl,
+ escape_identifier,
+ escape_literal_str,
+ getSourcePathLockFileName
+)
+from common_gdal import (
+ gdalVersionMin,
+ gdalGetMetadataItem,
+ getSRS,
+ getExtent,
+ parseGeomType,
+ parseFieldType,
+ parseSubFieldType,
+ parseTimeZone
+)
+from import_source import (
+ openOutputDS,
+ createOutputLayer,
+ validateOutputLayer,
+ importSources,
+ ImportStatus
+)
+from export_mvt import exportMVT
+
+def setFieldIf(cond : bool,
+ attrName : str,
+ val : Any,
+ data : dict[str, Any],
+ fldName : str,
+ drvName : str,
+ log = logging.warning) -> None:
+ """Conditionally set a field"""
if cond:
data[attrName] = val
else:
@@ -341,19 +89,23 @@ def setFieldIf(cond, attrName, val, data, fldName, drvName, log=logging.warning)
log('Ignoring %s=%s on field "%s" (not supported by %s driver)',
attrName, val2, fldName, drvName)
-# Validate layer creation options and schema. The schema is modified in
-# place with the parsed result.
-# (We need the driver of the output dataset to determine capability on
-# constraints.)
-def validateSchema(layers, drvo=None, lco_defaults=None):
+# pylint: disable-next=too-many-branches
+def validate_schema(layers : dict[str, Any],
+ drvo : Optional[gdal.Driver] = None,
+ lco_defaults : Optional[dict[str, str]] = None) -> None:
+ """Validate layer creation options and schema. The schema is
+ modified in place with the parsed result.
+ (We need the driver of the output dataset to determine capability on
+ constraints.)"""
+
# Cf. https://github.com/OSGeo/gdal/blob/master/NEWS.md
- if common.gdal_version_min(maj=3, min=7):
+ if gdalVersionMin(maj=3, min=7):
# list of capability flags supported by the CreateField() API
drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_CREATION_FIELD_DEFN_FLAGS)
drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else []
drvoSupportsFieldComment = 'Comment' in drvoFieldDefnFlags
# GetTZFlag()/SetTZFlag() and OGR_TZFLAG_* constants added in 3.8.0
- hasTZFlagSupport = common.gdal_version_min(maj=3, min=8)
+ hasTZFlagSupport = gdalVersionMin(maj=3, min=8)
else:
# list of flags supported by the OGRLayer::AlterFieldDefn() API
drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_ALTER_FIELD_DEFN_FLAGS)
@@ -364,9 +116,12 @@ def validateSchema(layers, drvo=None, lco_defaults=None):
# cache driver capabilities
drvoSupportsFieldWidthPrecision = 'WidthPrecision' in drvoFieldDefnFlags
- drvoSupportsFieldNullable = 'Nullable' in drvoFieldDefnFlags and getMetadataItem(drvo, GDAL_DCAP_NOTNULL_FIELDS)
- drvoSupportsFieldUnique = 'Unique' in drvoFieldDefnFlags and getMetadataItem(drvo, GDAL_DCAP_UNIQUE_FIELDS)
- drvoSupportsFieldDefault = 'Default' in drvoFieldDefnFlags and getMetadataItem(drvo, GDAL_DCAP_DEFAULT_FIELDS)
+ drvoSupportsFieldNullable = ('Nullable' in drvoFieldDefnFlags and
+ gdalGetMetadataItem(drvo, GDAL_DCAP_NOTNULL_FIELDS))
+ drvoSupportsFieldUnique = ('Unique' in drvoFieldDefnFlags and
+ gdalGetMetadataItem(drvo, GDAL_DCAP_UNIQUE_FIELDS))
+ drvoSupportsFieldDefault = ('Default' in drvoFieldDefnFlags and
+ gdalGetMetadataItem(drvo, GDAL_DCAP_DEFAULT_FIELDS))
drvoSupportsFieldAlternativeName = 'AlternativeName' in drvoFieldDefnFlags
for layername, layerdef in layers.items():
@@ -397,9 +152,9 @@ def validateSchema(layers, drvo=None, lco_defaults=None):
for idx, fld_def in enumerate(fields):
fld_name = fld_def.get('name', None)
if fld_name is None or fld_name == '':
- raise Exception(f'Field #{idx} has no name')
+ raise BadConfiguration(f'Field #{idx} has no name')
if fld_name in fields_set:
- raise Exception(f'Duplicate field "{fld_name}"')
+ raise BadConfiguration(f'Duplicate field "{fld_name}"')
fields_set.add(fld_name)
fld_def2 = { 'Name': fld_name }
@@ -407,7 +162,7 @@ def validateSchema(layers, drvo=None, lco_defaults=None):
k2 = k.lower()
if k2 == 'name':
pass
- elif k2 == 'alternativename' or k2 == 'alias':
+ elif k2 in ('alternativename', 'alias'):
setFieldIf(drvoSupportsFieldAlternativeName,
'AlternativeName', v, fld_def2, fld_name, drvo.ShortName,
log=logging.debug)
@@ -444,312 +199,12 @@ def validateSchema(layers, drvo=None, lco_defaults=None):
setFieldIf(drvoSupportsFieldUnique,
'Unique', v, fld_def2, fld_name, drvo.ShortName)
else:
- raise Exception(f'Field "{fld_name}" has unknown key "{k}"')
+ raise BadConfiguration(f'Field "{fld_name}" has unknown key "{k}"')
fields[idx] = fld_def2
-# Return the decoded Spatial Reference System
-def getSRS(srs_str):
- if srs_str is None:
- return
- srs = osr.SpatialReference()
- if srs_str.startswith('EPSG:'):
- code = int(srs_str.removeprefix('EPSG:'))
- srs.ImportFromEPSG(code)
- else:
- raise Exception(f'Unknown SRS {srs_str}')
- logging.debug('Default SRS: "%s" (%s)', srs.ExportToProj4(), srs.GetName())
- return srs
-
-# Convert extent [minX, minY, maxX, maxY] into a polygon and assign the
-# given SRS. Like apps/ogr2ogr_lib.cpp, we segmentize the polygon to
-# make sure it is sufficiently densified when transforming to source
-# layer SRS for spatial filtering.
-def getExtent(extent, srs=None):
- if extent is None:
- return
-
- if not (isinstance(extent, list) or isinstance(extent, tuple)) or len(extent) != 4:
- raise Exception(f'Invalid extent {extent}')
- elif srs is None:
- raise Exception('Configured extent but no SRS')
-
- logging.debug('Configured extent in %s: %s',
- srs.GetName(), ', '.join(map(str, extent)))
-
- ring = ogr.Geometry(ogr.wkbLinearRing)
- ring.AddPoint_2D(extent[0], extent[1])
- ring.AddPoint_2D(extent[2], extent[1])
- ring.AddPoint_2D(extent[2], extent[3])
- ring.AddPoint_2D(extent[0], extent[3])
- ring.AddPoint_2D(extent[0], extent[1])
-
- polygon = ogr.Geometry(ogr.wkbPolygon)
- polygon.AddGeometry(ring)
-
- # we expressed extent as minX, minY, maxX, maxY (easting/northing
- # ordered, i.e., in traditional GIS order)
- srs2 = srs.Clone()
- srs2.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER)
- polygon.AssignSpatialReference(srs2)
- polygon.TransformTo(srs)
-
- segment_distance_metre = 10 * 1000
- if srs.IsGeographic():
- dfMaxLength = segment_distance_metre / math.radians(srs.GetSemiMajor())
- polygon.Segmentize(dfMaxLength)
- elif srs.IsProjected():
- dfMaxLength = segment_distance_metre / srs.GetLinearUnits()
- polygon.Segmentize(dfMaxLength)
-
- return polygon
-
-# Validate the output layer against the provided SRS and creation options
-def validateOutputLayer(lyr, srs=None, options=None):
- ok = True
-
- # ensure the output SRS is equivalent
- if srs is not None:
- srs2 = lyr.GetSpatialRef()
- # cf. apps/ogr2ogr_lib.cpp
- srs_options = [
- 'IGNORE_DATA_AXIS_TO_SRS_AXIS_MAPPING=YES',
- 'CRITERION=EQUIVALENT'
- ]
- if not srs.IsSame(srs2, srs_options):
- logging.warning('Output layer "%s" has SRS %s,\nexpected %s',
- lyr.GetName(),
- srs2.ExportToPrettyWkt(),
- srs.ExportToPrettyWkt())
- ok = False
-
- if options is None:
- return ok
-
- layerDefn = lyr.GetLayerDefn()
- n = layerDefn.GetGeomFieldCount()
- if n != 1:
- logging.warning('Output layer "%s" has %d != 1 geometry fields', layername, n)
-
- geom_type1 = lyr.GetGeomType()
- geom_type2 = options['geometry-type']
- if geom_type1 != geom_type2:
- logging.warning('Output layer "%s" has geometry type #%d (%s), expected #%d (%s)',
- lyr.GetName(),
- geom_type1, ogr.GeometryTypeToName(geom_type1),
- geom_type2, ogr.GeometryTypeToName(geom_type2))
- ok = False
-
- fields = options.get('fields', None)
- if fields is not None:
- for fld in fields:
- fldName = fld['Name']
-
- idx = layerDefn.GetFieldIndex(fldName)
- if idx < 0:
- logging.warning('Output layer "%s" has no field named "%s"',
- lyr.GetName(), fldName)
- ok = False
- continue
- defn = layerDefn.GetFieldDefn(idx)
-
- if 'AlternativeName' in fld:
- v1 = defn.GetAlternativeName()
- v2 = fld['AlternativeName']
- if v1 != v2:
- logging.warning('Field "%s" has AlternativeName="%s", expected "%s"',
- fldName, v1, v2)
- ok = False
-
- if 'Comment' in fld:
- v1 = defn.GetComment()
- v2 = fld['Comment']
- if v1 != v2:
- logging.warning('Field "%s" has Comment="%s", expected "%s"',
- fldName, v1, v2)
- ok = False
-
- if 'Type' in fld:
- v1 = defn.GetType()
- v2 = fld['Type']
- if v1 != v2:
- logging.warning('Field "%s" has Type=%d (%s), expected %d (%s)',
- fldName,
- v1, ogr.GetFieldTypeName(v1),
- v2, ogr.GetFieldTypeName(v2))
- ok = False
-
- if 'SubType' in fld:
- v1 = defn.GetSubType()
- v2 = fld['SubType']
- if v1 != v2:
- logging.warning('Field "%s" has SubType=%d (%s), expected %d (%s)',
- fldName,
- v1, ogr.GetFieldSubTypeName(v1),
- v2, ogr.GetFieldSubTypeName(v2))
- ok = False
-
- if 'TZFlag' in fld:
- v1 = defn.GetTZFlag()
- v2 = fld['TZFlag']
- if v1 != v2:
- logging.warning('Field "%s" has TZFlag=%d (%s), expected %d (%s)',
- fldName, v1, formatTZFlag(v1), v2, formatTZFlag(v2))
- ok = False
-
- if 'Precision' in fld:
- v1 = defn.GetPrecision()
- v2 = fld['Precision']
- if v1 != v2:
- logging.warning('Field "%s" has Precision=%d, expected %d',
- fldName, v1, v2)
- ok = False
-
- if 'Width' in fld:
- v1 = defn.GetWidth()
- v2 = fld['Width']
- if v1 != v2:
- logging.warning('Field "%s" has Width=%d, expected %d',
- fldName, v1, v2)
- ok = False
-
- if 'Default' in fld:
- v1 = defn.GetDefault()
- v2 = fld['Default']
- if v1 != v2:
- logging.warning('Field "%s" has Default="%s", expected "%s"',
- fldName, v1, v2)
- ok = False
-
- if 'Nullable' in fld:
- v1 = bool(defn.IsNullable())
- v2 = fld['Nullable']
- if v1 != v2:
- logging.warning('Field "%s" has Nullable=%s, expected %s',
- fldName, v1, v2)
- ok = False
-
- if 'Unique' in fld:
- v1 = bool(defn.IsUnique())
- v2 = fld['Unique']
- if v1 != v2:
- logging.warning('Field "%s" has Unique=%s, expected %s',
- fldName, v1, v2)
- ok = False
-
- return ok
-
-# Create output layer
-def createOutputLayer(ds, layername, srs=None, options=None):
- logging.info('Creating new destination layer "%s"', layername)
- geom_type = options['geometry-type']
- lco = options.get('options', None)
-
- drv = ds.GetDriver()
- if geom_type != ogr.wkbNone and drv.ShortName == 'PostgreSQL':
- # “Important to set to 2 for 2D layers as it has constraints on the geometry
- # dimension during loading.”
- # — https://gdal.org/drivers/vector/pg.html#layer-creation-options
- if ogr.GT_HasM(geom_type):
- if ogr.GT_HasZ(geom_type):
- dim = 'XYZM'
- else:
- dim = 'XYM'
- elif ogr.GT_HasZ(geom_type):
- dim = '3'
- else:
- dim = '2'
- if lco is None:
- lco = []
- lco = ['dim=' + dim] + lco # prepend dim=
-
- kwargs = { 'geom_type': geom_type }
- if srs is not None:
- kwargs['srs'] = srs
- if lco is not None:
- kwargs['options'] = lco
- logging.debug('CreateLayer(%s, geom_type="%s"%s%s)', layername,
- ogr.GeometryTypeToName(geom_type),
- ', srs="' + kwargs['srs'].GetName() + '"' if 'srs' in kwargs else '',
- ', options=' + str(kwargs['options']) if 'options' in kwargs else '')
- lyr = dso.CreateLayer(layername, **kwargs)
- if lyr is None:
- raise Exception(f'Could not create destination layer "{layername}"')
-
- fields = options['fields']
- if len(fields) > 0 and not lyr.TestCapability(ogr.OLCCreateField):
- raise Exception(f'Destination layer "{layername}" lacks field creation capability')
-
- # set up output schema
- for fld in fields:
- fldName = fld['Name']
- defn = ogr.FieldDefn()
- defn.SetName(fldName)
-
- if 'AlternativeName' in fld:
- v = fld['AlternativeName']
- logging.debug('Set AlternativeName="%s" on output field "%s"', str(v), fldName)
- defn.SetAlternativeName(v)
-
- if 'Comment' in fld:
- v = fld['Comment']
- logging.debug('Set Comment="%s" on output field "%s"', str(v), fldName)
- defn.SetComment(v)
-
- if 'Type' in fld:
- v = fld['Type']
- logging.debug('Set Type=%d (%s) on output field "%s"',
- v, ogr.GetFieldTypeName(v), fldName)
- defn.SetType(v)
-
- if 'SubType' in fld:
- v = fld['SubType']
- logging.debug('Set SubType=%d (%s) on output field "%s"',
- v, ogr.GetFieldSubTypeName(v), fldName)
- defn.SetSubType(v)
-
- if 'TZFlag' in fld:
- v = fld['TZFlag']
- logging.debug('Set TZFlag=%d (%s) on output field "%s"',
- v, formatTZFlag(v), fldName)
- defn.SetTZFlag(v)
-
- if 'Precision' in fld:
- v = fld['Precision']
- logging.debug('Set Precision=%d on output field "%s"', v, fldName)
- defn.SetPrecision(v)
-
- if 'Width' in fld:
- v = fld['Width']
- logging.debug('Set Width=%d on output field "%s"', v, fldName)
- defn.SetWidth(v)
-
- if 'Default' in fld:
- v = fld['Default']
- logging.debug('Set Default=%s on output field "%s"', v, fldName)
- defn.SetDefault(v)
-
- if 'Nullable' in fld:
- v = fld['Nullable']
- logging.debug('Set Nullable=%s on output field "%s"', v, fldName)
- defn.SetNullable(v)
-
- if 'Unique' in fld:
- v = fld['Unique']
- logging.debug('Set Unique=%s on output field "%s"', v, fldName)
- defn.SetUnique(v)
-
- if lyr.CreateField(defn, approx_ok=False) != GDAL_CE_None:
- raise Exception('Could not create field "{fldName}"')
- logging.debug('Added field "%s" to output layer "%s"', fldName, layername)
-
- # flush before calling StartTransaction() so we're not tryingn to
- # rollback changes on a non-existing table
- lyr.SyncToDisk()
- return lyr
-
-# Setup output field mapping, modifying the sources dictionary in place.
-def setOutputFieldMap(defn, sources):
+def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]):
+ """Setup output field mapping, modifying the sources dictionary in place."""
fieldMap = {}
n = defn.GetFieldCount()
for i in range(n):
@@ -771,7 +226,7 @@ def setOutputFieldMap(defn, sources):
for ifld, ofld in fieldMap2.items():
i = fieldMap.get(ofld, None)
if i is None:
- raise Exception(f'Ouput layer has no field named "{ofld}"')
+ raise RuntimeError(f'Ouput layer has no field named "{ofld}"')
fieldMap2[ifld] = i
source_import['field-map'] = fieldMap2
@@ -785,403 +240,358 @@ def setOutputFieldMap(defn, sources):
rules = [rules]
for idx, rule in enumerate(rules):
if rule is None or not isinstance(rule, dict):
- raise Exception(f'Field "{fldName}" has invalid rule #{idx}: {rule}')
+ raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}')
if 'type' not in rule:
ruleType = rule['type'] = 'literal'
else:
ruleType = rule['type']
if ('replace' not in rule or 'with' not in rule or len(rule) != 3 or
- ruleType is None or ruleType not in ['literal', 'regex']):
- raise Exception(f'Field "{fldName}" has invalid rule #{idx}: {rule}')
+ ruleType is None or ruleType not in ('literal', 'regex')):
+ raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}')
if ruleType == 'regex':
rule['replace'] = re.compile(rule['replace'])
rules[idx] = ( rule['replace'], rule['with'] )
-# Escape the given identifier, cf.
-# swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id()
-def escapeIdentifier(identifier):
- if '\x00' in identifier:
- raise Exception(f'Invalid identifier "{identifier}"')
- # SQL:1999 delimited identifier
- return '"' + identifier.replace('"', '""') + '"'
-
-# Clear the given layer (wipe all its features)
-def clearLayer(ds, lyr):
- n = -1
- if lyr.TestCapability(ogr.OLCFastFeatureCount):
- n = lyr.GetFeatureCount(force=0)
- if n == 0:
- # nothing to clear, we're good
- return
- layername_esc = escapeIdentifier(lyr.GetName())
-
- # XXX GDAL <3.9 doesn't have lyr.GetDataset() so we pass the DS along with the layer
- drv = ds.GetDriver()
- if drv.ShortName == 'PostgreSQL':
- # https://www.postgresql.org/docs/15/sql-truncate.html
- query = 'TRUNCATE TABLE {table} CONTINUE IDENTITY RESTRICT'
- op = 'Truncating'
- else:
- query = 'DELETE FROM {table}'
- op = 'Clearing'
- logging.info('%s table %s (former feature count: %s)', op,
- layername_esc, str(n) if n >= 0 else 'unknown')
- ds.ExecuteSQL(query.format(table=layername_esc))
-
-# Extract an archive file into the given destination directory.
-def extractArchive(path, destdir, fmt=None, patterns=None, exact_matches=None):
- if fmt is None:
- suffix = path.suffix
- if suffix is None or suffix == '' or not suffix.startswith('.'):
- raise Exception(f'Could not infer archive format from "{path}"')
- fmt = suffix.removeprefix('.')
-
- fmt = fmt.lower()
- logging.debug('Unpacking %s archive %s into %s', fmt, path, destdir)
-
- if fmt == 'zip':
- from zipfile import ZipFile
- logging.debug('Opening %s as ZipFile', path)
- with ZipFile(path, mode='r') as z:
- namelist = listArchiveMembers(z.namelist(),
- patterns=patterns, exact_matches=exact_matches)
- z.extractall(path=destdir, members=namelist)
- else:
- raise Exception(f'Unknown archive format "{fmt}"')
-
-# List archive members matching the given parterns and/or exact matches
-def listArchiveMembers(namelist, patterns, exact_matches=None):
- if patterns is None and exact_matches is None:
- # if neither patterns nor exact_matches are given we'll extract the entire archive
- return namelist
- if patterns is None:
- patterns = []
- if exact_matches is None:
- exact_matches = []
-
- members = []
- for name in namelist:
- ok = False
- if name in exact_matches:
- # try exact matches first
- logging.debug('Listed archive member %s (exact match)', name)
- members.append(name)
- ok = True
- continue
- # if there are no exact matches, try patterns one by one in the supplied order
- for pat in patterns:
- if fnmatchcase(name, pat):
- logging.debug('Listed archive member %s (matching pattern "%s")', name, pat)
- members.append(name)
- ok = True
- break
- if not ok:
- logging.debug('Ignoring archive member %s', name)
- return members
-
-# Import a source layer
-def importSource(lyr, path=None, unar=None, args={}, cachedir=None, extent=None):
- if unar is None:
- return importSource2(lyr, str(path), args=args,
- basedir=cachedir, extent=extent)
-
- if cachedir is not None:
- path = cachedir.joinpath(path)
-
- ds_srcpath = Path(args['path'])
- if ds_srcpath.is_absolute():
- # treat absolute paths as relative to the archive root
- logging.warning('%s is absolute, removing leading anchor', ds_srcpath)
- ds_srcpath = ds_srcpath.relative_to(ds_srcpath.anchor)
- ds_srcpath = str(ds_srcpath)
-
- with tempfile.TemporaryDirectory() as tmpdir:
- logging.debug('Created temporary directory %s', tmpdir)
- extractArchive(path, tmpdir,
- fmt=unar.get('format', None),
- patterns=unar.get('patterns', None),
- exact_matches=[ds_srcpath])
- return importSource2(lyr, ds_srcpath, args=args,
- basedir=Path(tmpdir), extent=extent)
-
-# Validate field value mapping
-def setFieldMapValue(fld, idx, val):
- if val is None:
- if not fld.IsNullable():
- logging.warning('Field "%s" is not NULLable but remaps NULL', fld.GetName())
- return None
-
- fldType = fld.GetType()
- if fldType == ogr.OFTInteger or fldType == ogr.OFTInteger64:
- if isinstance(val, int):
- return val
- elif fldType == ogr.OFTString:
- if isinstance(val, str):
- return val
- elif fldType == ogr.OFTBinary:
- if isinstance(val, bytes):
- return val
- elif fldType == ogr.OFTReal:
- if isinstance(val, int):
- return float(val)
- elif isinstance(val, float):
- return val
-
- raise Exception(f'Field "{fld.GetName()}" mapping #{idx} has incompatible type for {ogr.GetFieldTypeName(fldType)}')
-
-# Import a source layer (already extracted)
-# This is more or less like ogr2ogr/GDALVectorTranslate() but we roll
-# out our own (slower) version because GDALVectorTranslate() insists in
-# calling StartTransaction() https://github.com/OSGeo/gdal/issues/3403
-# while we want a single transaction for the entire desination layer,
-# including truncation, source imports, and metadata changes.
-def importSource2(lyr_dst, path, args={}, basedir=None, extent=None):
- kwargs, _ = setOpenExArgs(args, flags=GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR)
- path2 = path if basedir is None else str(basedir.joinpath(path))
-
- logging.debug('OpenEx(%s, %s)', path2, str(kwargs))
- ds = gdal.OpenEx(path2, **kwargs)
- if ds is None:
- raise Exception(f'Could not open {path2}')
-
- layername = args.get('layername', None)
- if layername is None:
- idx = 0
- lyr = ds.GetLayerByIndex(idx)
- msg = '#' + str(idx)
- if lyr is not None:
- layername = lyr.GetName()
- msg += ' ("' + layername + '")'
- else:
- lyr = ds.GetLayerByName(layername)
- msg = '"' + layername + '"'
+def processOutputLayer(ds : gdal.Dataset,
+ layername : str,
+ layerdef : dict[str,Any],
+ srs : Optional[osr.SpatialReference] = None,
+ cachedir : Path|None = None,
+ extent : ogr.Geometry|None = None,
+ dsTransaction : bool = False,
+ lyrcache : ogr.Layer|None = None,
+ force : bool = False) -> ImportStatus:
+ """Process an output layer."""
+
+ logging.info('Processing output layer "%s"', layername)
+ lyr = ds.GetLayerByName(layername)
if lyr is None:
- raise Exception(f'Could not get requested layer {msg} from {path2}')
-
- logging.info('Importing layer %s from "%s"', msg, path)
- canIgnoreFields = lyr.TestCapability(ogr.OLCIgnoreFields)
-
- srs = lyr.GetSpatialRef()
- if srs is None:
- raise Exception('Source layer has no SRS')
-
- srs_dst = lyr_dst.GetSpatialRef()
- if srs_dst is None:
- logging.warning('Destination has no SRS, skipping coordinate transformation')
- ct = None
- elif srs_dst.IsSame(srs):
- logging.debug('Both source and destination have the same SRS (%s), skipping coordinate transformation',
- srs_dst.GetName())
- ct = None
- else:
- # TODO Use GetSupportedSRSList() and SetActiveSRS() with GDAL ≥3.7.0
- # when possible, see apps/ogr2ogr_lib.cpp
- logging.debug('Creating transforming from source SRS (%s) to destination SRS (%s)',
- srs.GetName(), srs_dst.GetName())
- ct = osr.CoordinateTransformation(srs, srs_dst)
- if ct is None:
- raise Exception(f'Could not create transformation from source SRS ({srs.GetName()}) '
- + f'to destination SRS ({srs_dst.GetName()})')
-
- defn = lyr.GetLayerDefn()
- geomFieldCount = defn.GetGeomFieldCount()
- if geomFieldCount != 1: # TODO Add support for multiple geometry fields
- logging.warning('Source layer "%s" has %d != 1 geometry fields', layername, geomFieldCount)
+ raise RuntimeError(f'Failed to create output layer "{layername}"??')
+ if not lyr.TestCapability(ogr.OLCSequentialWrite):
+ raise RuntimeError(f'Output layer "{layername}" has no working '
+ 'CreateFeature() method')
+
+ sources = layerdef['sources']
+ if not (lyrcache is None or force or
+ areSourceFilesNewer(layername, sources=sources,
+ lyrcache=lyrcache,
+ cachedir=cachedir)):
+ logging.info('Output layer "%s" is up to date, skipping', layername)
+ return ImportStatus.IMPORT_NOCHANGE
+
+ validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
+
+ description = layerdef.get('description', None)
+ if (description is not None and
+ lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
+ logging.warning('Could not set description metadata')
+
+ # setup output field mapping in the sources dictionary
+ setOutputFieldMap(lyr.GetLayerDefn(), sources)
+
+ return importSources(dso=ds, lyr=lyr, sources=sources,
+ cachedir=cachedir, extent=extent,
+ dsoTransaction=dsTransaction,
+ lyrcache=lyrcache,
+ force=force)
+
+def validate_sources(layers : dict[str, Any]) -> None:
+ """Mangle and validate layer sources and import definitions"""
+ toremove = set()
+ for layername, layerdefs in layers.items():
+ sources = layerdefs.get('sources', None)
+ if sources is None or len(sources) < 1:
+ logging.warning('Output layer "%s" has no definition, skipping', layername)
+ toremove.add(layername)
+ continue
- fieldCount = defn.GetFieldCount()
- fieldMap = [-1] * fieldCount
- fields = args['field-map']
- fieldSet = set()
+ for idx, layerdef in enumerate(sources):
+ importdef = layerdef.get('import', None)
+ if importdef is None:
+ raise BadConfiguration(f'Source #{idx} of output layer "{layername}" '
+ 'has no import definition')
- for i in range(fieldCount):
- fld = defn.GetFieldDefn(i)
- fldName = fld.GetName()
- fieldMap[i] = v = fields.get(fldName, -1)
- fieldSet.add(fldName)
-
- if v < 0 and canIgnoreFields:
- # call SetIgnored() on unwanted source fields
- logging.debug('Set Ignored=True on output field "%s"', fldName)
- fld.SetIgnored(True)
-
- count0 = -1
- if lyr.TestCapability(ogr.OLCFastFeatureCount):
- count0 = lyr.GetFeatureCount(force=0)
-
- if count0 == 0 and len(fieldSet) == 0:
- # skip the below warning in some cases (e.g., GeoJSON source)
- logging.info('Source layer "%s" has no fields nor features, skipping', layername)
- return
-
- logging.debug('Field map: %s', str(fieldMap))
- for fld in fields:
- if not fld in fieldSet:
- logging.warning('Source layer "%s" has no field named "%s", ignoring', layername, fld)
-
- count1 = -1
- if args.get('spatial-filter', True) and extent is not None:
- if extent.GetSpatialReference().IsSame(srs):
- extent2 = extent
- else:
- extent2 = extent.Clone()
- if extent2.TransformTo(srs) != ogr.OGRERR_NONE:
- raise Exception(f'Could not transform extent {extent.ExportToWkt()} to {srs.GetName()}')
+ sourcedef = layerdef.get('source', None)
+ unar = None if sourcedef is None else sourcedef.get('unar', None)
+ src = None if sourcedef is None else sourcedef.get('path', None)
- #logging.debug('Applying extent: %s', extent2.ExportToWkt())
- lyr.SetSpatialFilter(extent2)
+ ds_srcpath = importdef.get('path', None)
+ if src is None and unar is None and ds_srcpath is not None:
+ # fallback to importe:path if there is no unarchiving recipe
+ src = ds_srcpath
+ if unar is not None and ds_srcpath is None:
+ raise BadConfiguration(f'Source #{idx} of output layer "{layername}" '
+ 'has no import source path')
+ if src is None:
+ raise BadConfiguration(f'Source #{idx} of output layer "{layername}" '
+ 'has no source path')
+ layerdef['source'] = { 'path': src, 'unar': unar }
- if lyr.TestCapability(ogr.OLCFastFeatureCount):
- count1 = lyr.GetFeatureCount(force=0)
+ for layername in toremove:
+ layers.pop(layername)
+
+def validateLayerCacheField(defn : ogr.FeatureDefn, idx : int,
+ name : str,
+ typ : int,
+ subtyp : int = ogr.OFSTNone,
+ width : int = 0,
+ unique : Optional[bool] = None,
+ nullable : Optional[bool] = None) -> bool:
+ """Validate field #idx from the layer cache table."""
+ n = defn.GetFieldCount()
+ if idx >= n:
+ return False
+ defn = defn.GetFieldDefn(idx)
+
+ b = True
+ name2 = defn.GetName()
+ if name2 != name:
+ logging.warning('Layer cache\'s field #%d has name "%s" != "%s"', idx, name2, name)
+ b = False
+
+ if nullable is not None and defn.IsNullable() != nullable:
+ # non-fatal
+ logging.warning('Layer cache\'s field #%d ("%s") %s nullable',
+ idx, name2, 'is' if defn.IsNullable() else 'isn\'t')
+
+ if unique is not None and defn.IsUnique() != unique:
+ # non-fatal
+ logging.warning('Layer cache\'s field #%d ("%s") %s unique',
+ idx, name2, 'is' if defn.IsUnique() else 'isn\'t')
+
+ typ2 = defn.GetType()
+ if typ2 != typ:
+ logging.warning('Layer cache\'s field #%d ("%s") has type %s != %s', idx, name2,
+ ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ))
+ b = False
+
+ subtyp2 = defn.GetSubType()
+ if subtyp2 != subtyp:
+ logging.warning('Layer cache\'s field #%d ("%s") has subtype %s != %s', idx, name2,
+ ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp))
+ b = False
+
+ width2 = defn.GetWidth()
+ if width2 != 0 and (width == 0 or width2 < width):
+ # non-fatal
+ logging.warning('Layer cache\'s field #%d ("%s") is too small (width %d < %d)',
+ idx, name2, width2, width)
+ return b
+
+def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool:
+ """Validate layer cache table."""
+ drvName = ds.GetDriver().ShortName
+ if drvName != 'PostgreSQL': # we need hash_record_extended(), sha256() and ST_AsEWKB()
+ logging.warning('Unsupported cache layer for output driver %s', drvName)
+ return False
+ lyr = ds.GetLayerByName(name)
+ if lyr is None:
+ logging.warning('Table "%s" does not exist', name)
+ return False
- if count0 >= 0:
- if count1 >= 0:
- logging.info('Source layer "%s" has %d features (of which %d within extent)',
- layername, count0, count1)
- else:
- logging.info('Source layer "%s" has %d features', layername, count0)
-
- # build a list of triplets (field index, replacement_for_null, [(from_value, to_value), …])
- valueMap = []
- for fldName, rules in args.get('value-map', {}).items():
- i = defn.GetFieldIndex(fldName)
- if i < 0:
- raise Exception(f'Source layer "{layername}" has no field named "{fldName}"')
- if fieldMap[i] < 0:
- logging.warning('Ignored source field "%s" has value map', fldName)
- continue
+# if not (lyr.TestCapability(ogr.OLCRandomWrite) and
+# gdalVersionMin(maj=3, min=7) and
+# lyr.TestCapability(ogr.OLCUpdateFeature)):
+# logging.warning('Layer "%s" does not support OLCUpdateFeature capability, '
+# 'ignoring cache', name)
+# return None
- hasNullReplacement = False
- nullReplacement = None
- mapping = []
- fld = defn.GetFieldDefn(i)
- for idx, (rFrom, rTo) in enumerate(rules):
- # use fld for both 'from' and 'to' (the types must match, casting is not allowed in the mapping)
- if rFrom is None:
- if hasNullReplacement:
- logging.warning('Field "%s" has duplicate NULL replacement', fld.GetName())
- else:
- setFieldMapValue(fld, idx, None) # validate NULL
- rTo = setFieldMapValue(fld, idx, rTo)
- hasNullReplacement = True
- nullReplacement = rTo
- elif isinstance(rFrom, re.Pattern):
- # validate but keep the rFrom regex
- setFieldMapValue(fld, idx, str(rFrom))
- rTo = setFieldMapValue(fld, idx, rTo)
- mapping.append( (rFrom, rTo, 1) )
- else:
- rFrom = setFieldMapValue(fld, idx, rFrom)
- rTo = setFieldMapValue(fld, idx, rTo)
- mapping.append( (rFrom, rTo, 0) )
-
- if nullReplacement is not None or len(mapping) > 0:
- valueMap.append( (i, nullReplacement, mapping) )
-
- bValueMap = len(valueMap) > 0
- defn = None
-
- defn_dst = lyr_dst.GetLayerDefn()
- eGType_dst = defn_dst.GetGeomType()
- eGType_dst_HasZ = ogr.GT_HasZ(eGType_dst)
- eGType_dst_HasM = ogr.GT_HasM(eGType_dst)
- dGeomIsUnknown = ogr.GT_Flatten(eGType_dst) == ogr.wkbUnknown
-
- if bValueMap:
- valueMapCounts = [0] * fieldCount
-
- featureCount = 0
- mismatch = {}
- feature = lyr.GetNextFeature()
- while feature is not None:
- if bValueMap:
- for i, nullReplacement, mapping in valueMap:
- if not feature.IsFieldSet(i):
- continue
- elif feature.IsFieldNull(i):
- if nullReplacement is not None:
- # replace NULL with non-NULL value
- feature.SetField(i, nullReplacement)
- valueMapCounts[i] += 1
- continue
+ defn = lyr.GetLayerDefn()
+ fields = [
+ { 'name': 'layername', 'typ': ogr.OFTString,
+ 'nullable': False, 'unique': True, 'width': 255 },
+ { 'name': 'last_updated', 'typ': ogr.OFTDateTime,
+ 'nullable': False },
+ { 'name': 'fingerprint', 'typ': ogr.OFTBinary,
+ 'nullable': False, 'width': 32 },
+ ]
+ m = len(fields)
+ n = defn.GetFieldCount()
+ if n < m:
+ # this is fatal, and `all(bs)` is False so we return False below
+ logging.warning('Layer cache "%s" has %d < %d fields', name, n, m)
+ elif n != m:
+ logging.warning('Layer cache "%s" has %d != %d fields', name, n, m)
+ bs = [ validateLayerCacheField(defn, i, **fld) for i,fld in enumerate(fields) ]
+ if not all(bs):
+ return False
- v = feature.GetField(i)
- for rFrom, rTo, rType in mapping:
- if rType == 0:
- # literal
- if v != rFrom:
- continue
- elif rType == 1:
- # regex
- m = rFrom.fullmatch(v)
- if m is None:
- continue
- elif rTo is not None:
- rTo = rTo.format(*m.groups())
- else:
- raise Exception(str(rType))
+ n = defn.GetGeomFieldCount()
+ if n > 0:
+ geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName())
+ for i in range(n) ]
+ logging.warning('Layer cache "%s" has %d > 0 geometry field(s): %s',
+ name, n, ', '.join(geomFieldNames))
+
+ if gdalVersionMin(maj=3, min=5):
+ style = lyr.GetStyleTable()
+ if style is not None:
+ logging.warning('Layer cache "%s" has a style table "%s"',
+ name, style.GetLastStyleName())
+ return True
+
+def areSourceFilesNewer(layername : str,
+ sources : dict[str,Any],
+ lyrcache : ogr.Layer,
+ cachedir : Optional[Path] = None) -> bool:
+ """Return a boolean indicating whether the layer cache is up to date with
+ respect to the source files found on disk. That is, the last modification
+ and last changed time of each source file needs to be equal or lower than
+ the `last_updated` value found in the layer cache."""
+
+ source_paths = set()
+ for source in sources:
+ # the same source_path can be used multiple times, stat(2) only once
+ source_path = source['source']['path']
+ source_paths.add(source_path)
+ if len(source_paths) == 0:
+ return False
- if rTo is None:
- # replace non-NULL value with NULL
- feature.SetFieldNull(i)
- else:
- # replace non-NULL value with non-NULL value
- feature.SetField(i, rTo)
- valueMapCounts[i] += 1
- break
-
- feature2 = ogr.Feature(defn_dst)
- feature2.SetFromWithMap(feature, False, fieldMap)
-
- geom = feature2.GetGeometryRef()
- if ct is not None and geom.Transform(ct) != ogr.OGRERR_NONE:
- raise Exception('Could not apply coordinate transformation')
-
- eGType = geom.GetGeometryType()
- if eGType != eGType_dst and not dGeomIsUnknown:
- # Promote to multi, cf. apps/ogr2ogr_lib.cpp:ConvertType()
- eGType2 = eGType
- if eGType == ogr.wkbTriangle or eGType == ogr.wkbTIN or eGType == ogr.wkbPolyhedralSurface:
- eGType2 = ogr.wkbMultiPolygon
- elif not ogr.GT_IsSubClassOf(eGType, ogr.wkbGeometryCollection):
- eGType2 = ogr.GT_GetCollection(eGType)
-
- eGType2 = ogr.GT_SetModifier(eGType2, eGType_dst_HasZ, eGType_dst_HasM)
- if eGType2 == eGType_dst:
- mismatch[eGType] = mismatch.get(eGType, 0) + 1
- geom = ogr.ForceTo(geom, eGType_dst)
- # TODO call MakeValid()?
- else:
- raise Exception(f'Conversion from {ogr.GeometryTypeToName(eGType)} '
- f'to {ogr.GeometryTypeToName(eGType_dst)} not implemented')
- feature2.SetGeometryDirectly(geom)
-
- if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE:
- raise Exception(f'Could not transfer source feature #{feature.GetFID()}')
-
- featureCount += 1
- feature = lyr.GetNextFeature()
-
- if bValueMap:
- valueMapCounts = [ (lyr.GetLayerDefn().GetFieldDefn(i).GetName(), k) for i,k in enumerate(valueMapCounts) if k > 0 ]
-
- lyr = None
- logging.info('Imported %d features from source layer "%s"', featureCount, layername)
-
- if bValueMap:
- if len(valueMapCounts) > 0:
- valueMapCounts = ', '.join([ str(k) + '× "' + n + '"' for n,k in valueMapCounts ])
+ t = None
+ mtimes_ns = {}
+ for source_path in source_paths:
+ path = source_path if cachedir is None else str(cachedir.joinpath(source_path))
+ try:
+ st = os.stat(path)
+ if not S_ISREG(st.st_mode):
+ raise FileNotFoundError
+ mtimes_ns[source_path] = st.st_mtime_ns
+ # take max(mtime, ctime): if we lock source paths any update after
+ # aquiring the lock will yield a value larger than time.time_ns()
+ t2 = max(st.st_mtime_ns, st.st_ctime_ns)
+ if t is None or t < t2:
+ t = t2
+ except (OSError, ValueError):
+ #logging.warning('Could not stat(%s)', path)
+ return True
+ assert t is not None
+
+ attributeFilter = 'layername = ' + escape_literal_str(layername)
+ logging.debug('SetAttributeFilter("%s", "%s")', lyrcache.GetName(), attributeFilter)
+ lyrcache.SetAttributeFilter(attributeFilter)
+
+ feature = lyrcache.GetNextFeature()
+ if feature is None:
+ # not in cache
+ return True
+
+ if not feature.IsFieldSetAndNotNull(1):
+ ret = True
+ else:
+ # https://gdal.org/en/stable/api/python/vector_api.html#osgeo.ogr.Feature.GetFieldAsDateTime
+ # [ year, month, day, hour, minute, second, timezone flag ]
+ dt = feature.GetFieldAsDateTime(1)
+ if not gdalVersionMin(maj=3, min=8):
+ tz = None # assume local time
+ elif dt[6] == ogr.TZFLAG_UNKNOWN:
+ logging.warning('Datetime specified with unknown timezone in layer cache\'s '
+ 'field #%d "%s", assuming local time', 1,
+ feature.GetDefnRef().GetFieldDefn(1).GetName())
+ tz = None
+ elif dt[6] == ogr.TZFLAG_LOCALTIME:
+ tz = None
+ elif dt[6] == ogr.TZFLAG_UTC:
+ tz = UTC
else:
- valueMapCounts = '-'
- logging.info('Field substitutions: %s', valueMapCounts)
-
- if len(mismatch) > 0:
- mismatches = [ str(n) + '× ' + ogr.GeometryTypeToName(t)
- for t,n in sorted(mismatch.items(), key=lambda x: x[1]) ]
- logging.info('Forced conversion to %s: %s',
- ogr.GeometryTypeToName(eGType_dst), ', '.join(mismatches))
-
+ tz = timezone(offset=timedelta(seconds=(dt[6] - ogr.TZFLAG_UTC) * 900))
+ ms, s = modf(dt[5])
+ dt = datetime(
+ year=dt[0], # including century
+ month=dt[1], # 01 ≤ year ≤ 12
+ day=dt[2], # 01 ≤ day ≤ 31
+ hour=dt[3], # 00 ≤ hour ≤ 23
+ minute=dt[4], # 00 ≤ minute ≤ 59
+ second=int(s), # 00 ≤ second ≤ 59
+ microsecond=round(ms*1000000),
+ tzinfo=tz
+ )
+ fpr = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None
+ logging.debug('Found entry in layer cache for "%s", last_updated=%s, fingerprint=%s',
+ layername,
+ dt.isoformat(timespec='microseconds'),
+ fpr.hex() if fpr is not None else 'NULL')
+ ret = int(dt.timestamp() * 1000000.) * 1000 < t
+
+ if lyrcache.GetNextFeature() is not None:
+ raise RuntimeError(f'Duplicate key {layername}')
+
+ if not ret:
+ for source_path, mtime_ns in sorted(mtimes_ns.items()):
+ # XXX datetime.fromtimestamp() doesn't support nanosecond input
+ # https://github.com/python/cpython/issues/59648
+ mtime = (mtime_ns // 1000) / 1000000.
+ dt = datetime.fromtimestamp(mtime)
+ logging.info('Source file %s is unchanged (last modification time %s)',
+ source_path, dt.astimezone().isoformat(timespec='seconds'))
+ return ret
+
+def getLastMTimes(layerdefs : dict[str,Any], basedir : Optional[Path] = None) -> dict[str,int]:
+ """Return a directing mapping source paths to their last modification time
+ (as a timestamp in milliseconds)."""
+ ret = {}
+ for layerdef in layerdefs:
+ for source in layerdef['sources']:
+ source_path = source['source']['path']
+ if source_path in ret:
+ continue
+ path = source_path if basedir is None else str(basedir.joinpath(source_path))
+ try:
+ st = os.stat(path)
+ if not S_ISREG(st.st_mode):
+ raise FileNotFoundError
+ ret[source_path] = st.st_mtime_ns // 1000000
+ except (OSError, ValueError):
+ #logging.warning('Could not stat(%s)', path)
+ pass
+ return ret
+
+def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]:
+ """Place shared locks on each source path and return their respective file
+ descriptors. We could do that one layerdef at a time (one output layer at a
+ time) to reduce the time during which the sources prevented from being
+ updated/downloaded, but their is some value in having consistency across the
+ whole import process."""
+ umask = os.umask(0o002)
+ lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY)
+ try:
+ ret = {}
+ for layerdef in layerdefs:
+ for source in layerdef['sources']:
+ source_path = source['source']['path']
+ if source_path in ret:
+ continue
+ lockfile = getSourcePathLockFileName(source_path)
+ lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664,
+ dir_fd=lockdir_fd)
+ logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)',
+ source_path, lockfile)
+ flock(lock_fd, LOCK_SH)
+ ret[source_path] = lock_fd
+ return ret
+ finally:
+ try:
+ os.close(lockdir_fd)
+ except (OSError, ValueError):
+ logging.exception('Could not close lockdir')
+ os.umask(umask)
+
+def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None:
+ """Release shared locks on the source paths. Closed FDs are removed from
+ the dictionary in place."""
+ toremove = set()
+ for path, lock_fd in lock_fds.items():
+ try:
+ os.close(lock_fd)
+ except (OSError, ValueError):
+ logging.exception('Could not release lock for %s', path)
+ else:
+ logging.debug('Released lock for %s', path)
+ toremove.add(path)
+ for path in toremove:
+ lock_fds.pop(path)
-if __name__ == '__main__':
+# pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements
+def main() -> NoReturn:
common.init_logger(app=os.path.basename(__file__), level=logging.INFO)
parser = argparse.ArgumentParser(description='Extract and import GIS layers.')
@@ -1189,135 +599,181 @@ if __name__ == '__main__':
help=f'cache directory (default: {os.curdir})')
parser.add_argument('--debug', action='count', default=0,
help=argparse.SUPPRESS)
+ parser.add_argument('--lockfile', default=None,
+ help='obtain an exclusive lock before processing')
+ parser.add_argument('--lockdir-sources', default=None,
+ help='optional directory for lock files to source paths')
+ parser.add_argument('--mvtdir', default=None,
+ help='optional directory for Mapbox Vector Tiles (MVT)')
+ parser.add_argument('--mvt-compress', default=False, action='store_true',
+ help='whether to compress Mapbox Vector Tiles (MVT) files')
+ parser.add_argument('--force', default=False, action='store_true',
+ help='import even if no new changes were detected')
parser.add_argument('groupname', nargs='*', help='group layer name(s) to process')
args = parser.parse_args()
- if args.debug > 0:
+ if args.debug > 0: # pylint: disable=duplicate-code
logging.getLogger().setLevel(logging.DEBUG)
if args.debug > 1:
gdal.ConfigurePythonLogging(enable_debug=True)
- common.load_config(groupnames=None if args.groupname == [] else args.groupname)
+ config = common.parse_config(groupnames=None if args.groupname == [] else args.groupname)
# validate configuration
- if 'dataset' not in common.config:
- raise Exception('Configuration does not specify output dataset')
-
- layers = common.config.get('layers', {})
- for layername, layerdefs in layers.items():
- for idx, layerdef in enumerate(layerdefs['sources']):
- importdef = layerdef.get('import', None)
- if importdef is None:
- raise Exception(f'Output layer "{layername}" source #{idx} has no import definition')
-
- sourcedef = layerdef.get('source', None)
- unar = None if sourcedef is None else sourcedef.get('unar', None)
- src = None if sourcedef is None else sourcedef['cache'].get('path', None)
+ if 'dataset' not in config:
+ raise BadConfiguration('Configuration does not specify output dataset')
- ds_srcpath = importdef.get('path', None)
- if src is None and unar is None and ds_srcpath is not None:
- # fallback to importe:path if there is no unarchiving receipe
- src = Path(ds_srcpath)
- if unar is not None and ds_srcpath is None:
- raise Exception(f'Output layer "{layername}" source #{idx} has no import source path')
- if src is None:
- raise Exception(f'Output layer "{layername}" source #{idx} has no source path')
- layerdef['source'] = { 'path': src, 'unar': unar }
+ layers = config.get('layers', {})
+ validate_sources(layers)
# set global GDAL/OGR configuration options
- for pszKey, pszValue in common.config.get('GDALconfig', {}).items():
+ for pszKey, pszValue in config.get('GDALconfig', {}).items():
logging.debug('gdal.SetConfigOption(%s, %s)', pszKey, pszValue)
gdal.SetConfigOption(pszKey, pszValue)
# open output dataset (possibly create it first)
- dso = openOutputDS(common.config['dataset'])
+ dso = openOutputDS(config['dataset'])
- validateSchema(layers,
- drvo=dso.GetDriver(),
- lco_defaults=common.config['dataset'].get('create-layer-options', None))
+ validate_schema(layers,
+ drvo=dso.GetDriver(),
+ lco_defaults=config['dataset'].get('create-layer-options', None))
# get configured Spatial Reference System and extent
- srs = getSRS(common.config.get('SRS', None))
- extent = getExtent(common.config.get('extent', None), srs=srs)
+ srs = getSRS(config.get('SRS', None))
+ extent = getExtent(config.get('extent', None), srs=srs)
- cachedir = Path(args.cachedir) if args.cachedir is not None else None
- rv = 0
+ if args.lockfile is not None:
+ # obtain an exclusive lock and don't release it until exiting the program
+ lock_fd = os.open(args.lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644)
+ logging.debug('flock("%s", LOCK_EX)', args.lockfile)
+ flock(lock_fd, LOCK_EX)
+
+ # create all output layers before starting the transaction
for layername, layerdef in layers.items():
- sources = layerdef['sources']
- if sources is None or len(sources) < 1:
- logging.warning('Output layer "%s" has no definition, skipping', layername)
+ lyr = dso.GetLayerByName(layername)
+ if lyr is not None:
+ # TODO dso.DeleteLayer(layername) if --overwrite and
+ # dso.TestCapability(ogr.ODsCDeleteLayer)
+ # (Sets OVERWRITE=YES for PostgreSQL and GPKG.)
continue
+ if not dso.TestCapability(ogr.ODsCCreateLayer):
+ raise RuntimeError(f'Output driver {dso.GetDriver().ShortName} does not '
+ 'support layer creation')
+ createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None))
+
+ if args.mvtdir is not None:
+ args.mvtdir = Path(args.mvtdir)
+ if args.mvtdir == Path(): # make sure it's not curdir as we don't want to exchange it
+ raise RuntimeError('Invalid value for --mvtdir')
+ args.mvtdir.parent.mkdir(parents=True, exist_ok=True)
+
+ if args.cachedir is not None:
+ args.cachedir = Path(args.cachedir)
+ if args.lockdir_sources is None:
+ sourcePathLocks = None
+ else:
+ sourcePathLocks = lockSourcePaths(layerdefs=layers.values(),
+ lockdir=args.lockdir_sources)
+
+ if (dso.TestCapability(ogr.ODsCTransactions) and
+ # we need SAVEPOINT support
+ dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')):
+ logging.debug('Starting transaction')
+ dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE
+ else:
+ logging.warning('Output driver %s does not support dataset transactions or SQL SAVEPOINTs',
+ dso.GetDriver().ShortName)
+ dsoTransaction = False
+
+ # validate layer cache
+ lyr_cache = config['dataset'].get('layercache', None)
+ if lyr_cache is None:
+ pass
+ elif validateCacheLayer(dso, lyr_cache):
+ lyr_cache = dso.GetLayerByName(lyr_cache)
+ else:
+ if not args.force:
+ logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache)
+ args.force = True
+ lyr_cache = None
- logging.info('Processing output layer "%s"', layername)
- transaction = False
- try:
- # get output layer
- outLayerIsNotEmpty = True
- lco = layerdef.get('create', None)
- lyr = dso.GetLayerByName(layername)
- if lyr is not None:
- # TODO dso.DeleteLayer(layername) if --overwrite and dso.TestCapability(ogr.ODsCDeleteLayer)
- # (Sets OVERWRITE=YES for PostgreSQL and GPKG.)
- validateOutputLayer(lyr, srs=srs, options=lco)
- # TODO bail out if all source files are older than lyr's last_change
- elif not dso.TestCapability(ogr.ODsCCreateLayer):
- raise Exception(f'Output driver {dso.GetDriver().ShortName} does not support layer creation')
- elif lco is None or len(lco) < 1:
- raise Exception(f'Missing schema for new output layer "{layername}"')
- else:
- lyr = createOutputLayer(dso, layername, srs=srs, options=lco)
- outLayerIsNotEmpty = False
-
- if not lyr.TestCapability(ogr.OLCSequentialWrite):
- raise Exception(f'Output layer "{layername}" has no working CreateFeature() method')
-
- # setup output field mapping in the sources dictionary
- setOutputFieldMap(lyr.GetLayerDefn(), sources)
-
- # start transaction if possible
- if lyr.TestCapability(ogr.OLCTransactions):
- logging.debug('Starting transaction')
- transaction = lyr.StartTransaction() == ogr.OGRERR_NONE
- else:
- logging.warning('Unsafe update, output layer "%s" does not support transactions', layername)
-
- if outLayerIsNotEmpty:
- # clear non-empty output layer
- clearLayer(dso, lyr)
-
- description = layerdef.get('description', None)
- if description is not None and lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None:
- logging.warning('Could not set description metadata')
-
- for source in sources:
- # import source layers
- importSource(lyr, **source['source'], args=source['import'],
- cachedir=cachedir, extent=extent)
-
- if transaction:
- # commit transaction
- logging.debug('Committing transaction')
- transaction = False
- if lyr.CommitTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not commit transaction')
- rv = 1
-
- except Exception:
- if transaction:
- logging.error('Exception occured in transaction, rolling back')
- try:
- if lyr.RollbackTransaction() != ogr.OGRERR_NONE:
+ rv = 0
+ try:
+ r = {}
+ n = 0
+ start = time_monotonic()
+ for layername, layerdef in layers.items():
+ r[layername] = r0 = processOutputLayer(dso, layername, layerdef,
+ srs=srs,
+ cachedir=args.cachedir,
+ extent=extent,
+ dsTransaction=dsoTransaction,
+ lyrcache=lyr_cache,
+ force=args.force)
+ n += 1
+ logging.info('Import result status for layer "%s": %s', layername, str(r0))
+ if r0 == ImportStatus.IMPORT_ERROR:
+ rv = 1
+ if dsoTransaction:
+ dsoTransaction = False
+ logging.debug('Rolling back transaction')
+ # no need to catch the exception here
+ if dso.CommitTransaction() != ogr.OGRERR_NONE:
logging.error('Could not rollback transaction')
- except RuntimeError:
- logging.exception('Could not rollback transaction')
- logging.exception('Could not import layer "%s"', layername)
- rv = 1
-
- finally:
- # close output layer
- lyr = None
-
- dso = None
- srs = None
- extent = None
- exit(rv)
+ break
+ elapsed = time_monotonic() - start
+ logging.info('Processed %d destination layers in %s', n, common.format_time(elapsed))
+
+ # get mtimes before releasing the source locks
+ last_modified = getLastMTimes(layerdefs=layers.values(), basedir=args.cachedir)
+
+ if sourcePathLocks is not None:
+ releaseSourcePathLocks(sourcePathLocks)
+
+ export_layers = { l:d for l,d in layers.items() if d.get('publish', None) is not None }
+ if args.mvtdir is None or any(r0 == ImportStatus.IMPORT_ERROR for r0 in r.values()):
+ pass
+ elif len(export_layers) == 0:
+ logging.warning('--mvtdir option used but no layer has a publication definition')
+ elif (all(r0 == ImportStatus.IMPORT_NOCHANGE for l,r0 in r.items() if l in export_layers)
+ and args.mvtdir.is_dir()):
+ logging.info('Skipping MVT export for group %s (no changes)',
+ ', '.join(args.groupname) if args.groupname is not None else '*')
+ else:
+ exportMVT(dso,
+ layers=export_layers,
+ sources=parse_config_dl(config.get('downloads', [])),
+ license_info=config.get('license-info', {}),
+ last_modified=last_modified,
+ dst=args.mvtdir,
+ default_options=config.get('vector-tiles', None),
+ compress=args.mvt_compress)
+
+ if dsoTransaction:
+ dsoTransaction = False
+ logging.debug('Committing transaction')
+ if dso.CommitTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not commit transaction')
+ rv = 1
+
+ except Exception: # pylint: disable=broad-exception-caught
+ if dsoTransaction:
+ logging.exception('Exception occured within transaction, rolling back')
+ try:
+ if dso.RollbackTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not rollback transaction')
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not rollback transaction')
+ else:
+ traceback.print_exc()
+ sys.exit(1)
+
+ finally:
+ lyr_cache = None
+ dso = None
+ extent = None
+ srs = None
+ sys.exit(rv)
+
+gdal.UseExceptions()
+main()