aboutsummaryrefslogtreecommitdiffstats
path: root/geodata-import
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2026-03-06 10:52:43 +0100
committerGuilhem Moulin <guilhem@fripost.org>2026-03-06 16:06:41 +0100
commitca91a579770c89d25aefae220079bf336fa88dc9 (patch)
treecb1f49adacf12b0cb15b6430b0446fbee2135814 /geodata-import
parent94438a900d3fb933a33aed4d2ffeb8809e966c46 (diff)
Rename "webmap" to the less generic "geodata".
The database has uses beyond the webmap.
Diffstat (limited to 'geodata-import')
-rwxr-xr-xgeodata-import794
1 files changed, 794 insertions, 0 deletions
diff --git a/geodata-import b/geodata-import
new file mode 100755
index 0000000..2f0f5b4
--- /dev/null
+++ b/geodata-import
@@ -0,0 +1,794 @@
+#!/usr/bin/python3
+
+#----------------------------------------------------------------------
+# Backend utilities for the Klimatanalys Norr project (extract/import layers)
+# Copyright © 2024-2025 Guilhem Moulin <info@guilhem.se>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#----------------------------------------------------------------------
+
+# pylint: disable=invalid-name, missing-module-docstring, fixme
+
+from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY
+import os
+from stat import S_ISREG
+import sys
+from fcntl import flock, LOCK_EX, LOCK_SH
+import logging
+import argparse
+import re
+from datetime import datetime, timedelta, timezone, UTC
+from math import modf
+from pathlib import Path
+from time import monotonic as time_monotonic
+from typing import Any, Optional, NoReturn
+import traceback
+
+from osgeo import gdal, ogr, osr
+from osgeo.gdalconst import (
+ CE_None as GDAL_CE_None,
+ DCAP_DEFAULT_FIELDS as GDAL_DCAP_DEFAULT_FIELDS,
+ DCAP_NOTNULL_FIELDS as GDAL_DCAP_NOTNULL_FIELDS,
+ DCAP_UNIQUE_FIELDS as GDAL_DCAP_UNIQUE_FIELDS,
+)
+from osgeo import gdalconst
+
+import common
+from common import (
+ BadConfiguration,
+ parse_config_dl,
+ escape_identifier,
+ escape_literal_str,
+ getSourcePathLockFileName
+)
+from common_gdal import (
+ gdalGetMetadataItem,
+ getSRS,
+ getExtent,
+ parseGeomType,
+ parseFieldType,
+ parseSubFieldType,
+ parseTimeZone
+)
+from import_source import (
+ openOutputDS,
+ createOutputLayer,
+ validateOutputLayer,
+ importSources,
+ ImportStatus
+)
+from export_mvt import exportMVT
+from export_raster import processRaster
+
+def setFieldIf(cond : bool,
+ attrName : str,
+ val : Any,
+ data : dict[str, Any],
+ fldName : str,
+ drvName : str,
+ log = logging.warning) -> None:
+ """Conditionally set a field"""
+ if cond:
+ data[attrName] = val
+ else:
+ if isinstance(val, str):
+ val2 = '"' + val + '"'
+ else:
+ val2 = str(val)
+ log('Ignoring %s=%s on field "%s" (not supported by %s driver)',
+ attrName, val2, fldName, drvName)
+
+# pylint: disable-next=too-many-branches
+def validate_schema(layers : dict[str, Any],
+ drvo : Optional[gdal.Driver] = None,
+ lco_defaults : Optional[dict[str, str]] = None) -> None:
+ """Validate layer creation options and schema. The schema is
+ modified in place with the parsed result.
+ (We need the driver of the output dataset to determine capability on
+ constraints.)"""
+
+ # list of capability flags supported by the CreateField() API
+ drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_CREATION_FIELD_DEFN_FLAGS)
+ drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else []
+ drvoSupportsFieldComment = 'Comment' in drvoFieldDefnFlags
+
+ # cache driver capabilities
+ drvoSupportsFieldWidthPrecision = 'WidthPrecision' in drvoFieldDefnFlags
+ drvoSupportsFieldNullable = ('Nullable' in drvoFieldDefnFlags and
+ gdalGetMetadataItem(drvo, GDAL_DCAP_NOTNULL_FIELDS))
+ drvoSupportsFieldUnique = ('Unique' in drvoFieldDefnFlags and
+ gdalGetMetadataItem(drvo, GDAL_DCAP_UNIQUE_FIELDS))
+ drvoSupportsFieldDefault = ('Default' in drvoFieldDefnFlags and
+ gdalGetMetadataItem(drvo, GDAL_DCAP_DEFAULT_FIELDS))
+ drvoSupportsFieldAlternativeName = 'AlternativeName' in drvoFieldDefnFlags
+
+ for layername, layerdef in layers.items():
+ create = layerdef.get('create', None)
+ if create is None or len(create) < 1:
+ logging.warning('Layer "%s" has no creation schema', layername)
+ continue
+
+ # prepend global layer creation options (dataset:create-layer-options)
+ # and build the option=value list
+ lco = create.get('options', None)
+ if lco_defaults is not None or lco is not None:
+ options = []
+ if lco_defaults is not None:
+ options += [ k + '=' + str(v) for k, v in lco_defaults.items() ]
+ if lco is not None:
+ options += [ k + '=' + str(v) for k, v in lco.items() ]
+ create['options'] = options
+
+ # parse geometry type
+ create['geometry-type'] = parseGeomType(create.get('geometry-type', None))
+
+ fields = create.get('fields', None)
+ if fields is None:
+ create['fields'] = []
+ else:
+ fields_set = set()
+ for idx, fld_def in enumerate(fields):
+ fld_name = fld_def.get('name', None)
+ if fld_name is None or fld_name == '':
+ raise BadConfiguration(f'Field #{idx} has no name')
+ if fld_name in fields_set:
+ raise BadConfiguration(f'Duplicate field "{fld_name}"')
+ fields_set.add(fld_name)
+
+ fld_def2 = { 'Name': fld_name }
+ for k, v in fld_def.items():
+ k2 = k.lower()
+ if k2 == 'name':
+ pass
+ elif k2 in ('alternativename', 'alias'):
+ setFieldIf(drvoSupportsFieldAlternativeName,
+ 'AlternativeName', v, fld_def2, fld_name, drvo.ShortName,
+ log=logging.debug)
+ elif k2 == 'comment':
+ setFieldIf(drvoSupportsFieldComment,
+ 'Comment', v, fld_def2, fld_name, drvo.ShortName,
+ log=logging.debug)
+
+ elif k2 == 'type':
+ fld_def2['Type'] = parseFieldType(v)
+ elif k2 == 'subtype':
+ fld_def2['SubType'] = parseSubFieldType(v)
+ elif k2 == 'tz':
+ fld_def2['TZFlag'] = parseTimeZone(v)
+ elif k2 == 'width' and v is not None and isinstance(v, int):
+ setFieldIf(drvoSupportsFieldWidthPrecision,
+ 'Width', v, fld_def2, fld_name, drvo.ShortName)
+ elif k2 == 'precision' and v is not None and isinstance(v, int):
+ setFieldIf(drvoSupportsFieldWidthPrecision,
+ 'Precision', v, fld_def2, fld_name, drvo.ShortName)
+
+ # constraints
+ elif k2 == 'default':
+ setFieldIf(drvoSupportsFieldDefault,
+ 'Default', v, fld_def2, fld_name, drvo.ShortName)
+ elif k2 == 'nullable' and v is not None and isinstance(v, bool):
+ setFieldIf(drvoSupportsFieldNullable,
+ 'Nullable', v, fld_def2, fld_name, drvo.ShortName)
+ elif k2 == 'unique' and v is not None and isinstance(v, bool):
+ setFieldIf(drvoSupportsFieldUnique,
+ 'Unique', v, fld_def2, fld_name, drvo.ShortName)
+ else:
+ raise BadConfiguration(f'Field "{fld_name}" has unknown key "{k}"')
+
+ fields[idx] = fld_def2
+
+def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]):
+ """Setup output field mapping, modifying the sources dictionary in place."""
+ fieldMap = {}
+ n = defn.GetFieldCount()
+ for i in range(n):
+ fld = defn.GetFieldDefn(i)
+ fldName = fld.GetName()
+ fieldMap[fldName] = i
+
+ for source in sources:
+ source_import = source['import']
+
+ fieldMap2 = source_import.get('field-map', None)
+ if fieldMap2 is None:
+ fieldMap2 = fieldMap
+ else:
+ if isinstance(fieldMap2, list):
+ # convert list to identity dictionary
+ fieldMap2 = { fld: fld for fld in fieldMap2 }
+
+ for ifld, ofld in fieldMap2.items():
+ i = fieldMap.get(ofld, None)
+ if i is None:
+ raise RuntimeError(f'Ouput layer has no field named "{ofld}"')
+ fieldMap2[ifld] = i
+ source_import['field-map'] = fieldMap2
+
+ # validate field value mapping
+ valueMap = source_import.get('value-map', None)
+ if valueMap is not None:
+ for fldName, rules in valueMap.items():
+ if rules is None:
+ continue
+ if not isinstance(rules, list):
+ rules = [rules]
+ for idx, rule in enumerate(rules):
+ if rule is None or not isinstance(rule, dict):
+ raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}')
+ if 'type' not in rule:
+ ruleType = rule['type'] = 'literal'
+ else:
+ ruleType = rule['type']
+ if ('replace' not in rule or 'with' not in rule or len(rule) != 3 or
+ ruleType is None or ruleType not in ('literal', 'regex')):
+ raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}')
+ if ruleType == 'regex':
+ rule['replace'] = re.compile(rule['replace'])
+ rules[idx] = ( rule['replace'], rule['with'] )
+
+def processOutputLayer(ds : gdal.Dataset,
+ layername : str,
+ layerdef : dict[str,Any],
+ srs : Optional[osr.SpatialReference] = None,
+ cachedir : Path|None = None,
+ extent : ogr.Geometry|None = None,
+ dsTransaction : bool = False,
+ lyrcache : ogr.Layer|None = None,
+ force : bool = False) -> ImportStatus:
+ """Process an output layer."""
+
+ logging.info('Processing output layer "%s"', layername)
+ lyr = ds.GetLayerByName(layername)
+ if lyr is None:
+ raise RuntimeError(f'Failed to create output layer "{layername}"??')
+ if not lyr.TestCapability(ogr.OLCSequentialWrite):
+ raise RuntimeError(f'Output layer "{layername}" has no working '
+ 'CreateFeature() method')
+
+ sources = layerdef['sources']
+ if not (lyrcache is None or force or
+ areSourceFilesNewer(layername, sources=sources,
+ lyrcache=lyrcache,
+ cachedir=cachedir)):
+ logging.info('Output layer "%s" is up to date, skipping', layername)
+ return ImportStatus.IMPORT_NOCHANGE
+
+ validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
+
+ description = layerdef.get('description', None)
+ if (description is not None and
+ lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
+ logging.warning('Could not set description metadata')
+
+ # setup output field mapping in the sources dictionary
+ setOutputFieldMap(lyr.GetLayerDefn(), sources)
+
+ return importSources(lyr=lyr, sources=sources,
+ cachedir=cachedir, extent=extent,
+ dsoTransaction=dsTransaction,
+ lyrcache=lyrcache,
+ force=force,
+ cluster_geometry=layerdef.get('cluster-geometry', False))
+
+def validate_sources(layers : dict[str, Any]) -> None:
+ """Mangle and validate layer sources and import definitions"""
+ toremove = set()
+ for layername, layerdefs in layers.items():
+ sources = layerdefs.get('sources', None)
+ if sources is None or len(sources) < 1:
+ logging.warning('Output layer "%s" has no definition, skipping', layername)
+ toremove.add(layername)
+ continue
+
+ for idx, layerdef in enumerate(sources):
+ importdef = layerdef.get('import', None)
+ if importdef is None:
+ raise BadConfiguration(f'Source #{idx} of output layer "{layername}" '
+ 'has no import definition')
+
+ sourcedef = layerdef.get('source', None)
+ unar = None if sourcedef is None else sourcedef.get('unar', None)
+ src = None if sourcedef is None else sourcedef.get('path', None)
+
+ ds_srcpath = importdef.get('path', None)
+ if src is None and unar is None and ds_srcpath is not None:
+ # fallback to importe:path if there is no unarchiving recipe
+ src = ds_srcpath
+ if unar is not None and ds_srcpath is None:
+ raise BadConfiguration(f'Source #{idx} of output layer "{layername}" '
+ 'has no import source path')
+ if src is None:
+ raise BadConfiguration(f'Source #{idx} of output layer "{layername}" '
+ 'has no source path')
+ layerdef['source'] = { 'path': src, 'unar': unar }
+
+ for layername in toremove:
+ layers.pop(layername)
+
+def validateLayerCacheField(defn : ogr.FeatureDefn, idx : int,
+ name : str,
+ typ : int,
+ subtyp : int = ogr.OFSTNone,
+ width : int = 0,
+ unique : Optional[bool] = None,
+ nullable : Optional[bool] = None) -> bool:
+ """Validate field #idx from the layer cache table."""
+ n = defn.GetFieldCount()
+ if idx >= n:
+ return False
+ defn = defn.GetFieldDefn(idx)
+
+ b = True
+ name2 = defn.GetName()
+ if name2 != name:
+ logging.warning('Layer cache\'s field #%d has name "%s" != "%s"', idx, name2, name)
+ b = False
+
+ if nullable is not None and defn.IsNullable() != nullable:
+ # non-fatal
+ logging.warning('Layer cache\'s field #%d ("%s") %s nullable',
+ idx, name2, 'is' if defn.IsNullable() else 'isn\'t')
+
+ if unique is not None and defn.IsUnique() != unique:
+ # non-fatal
+ logging.warning('Layer cache\'s field #%d ("%s") %s unique',
+ idx, name2, 'is' if defn.IsUnique() else 'isn\'t')
+
+ typ2 = defn.GetType()
+ if typ2 != typ:
+ logging.warning('Layer cache\'s field #%d ("%s") has type %s != %s', idx, name2,
+ ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ))
+ b = False
+
+ subtyp2 = defn.GetSubType()
+ if subtyp2 != subtyp:
+ logging.warning('Layer cache\'s field #%d ("%s") has subtype %s != %s', idx, name2,
+ ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp))
+ b = False
+
+ width2 = defn.GetWidth()
+ if width2 != 0 and (width == 0 or width2 < width):
+ # non-fatal
+ logging.warning('Layer cache\'s field #%d ("%s") is too small (width %d < %d)',
+ idx, name2, width2, width)
+ return b
+
+def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool:
+ """Validate layer cache table."""
+ drvName = ds.GetDriver().ShortName
+ if drvName != 'PostgreSQL': # we need hash_record_extended(), sha256() and ST_AsEWKB()
+ logging.warning('Unsupported cache layer for output driver %s', drvName)
+ return False
+ lyr = ds.GetLayerByName(name)
+ if lyr is None:
+ logging.warning('Table "%s" does not exist', name)
+ return False
+
+ if not (lyr.TestCapability(ogr.OLCRandomWrite) and lyr.TestCapability(ogr.OLCUpdateFeature)):
+ logging.warning('Layer "%s" does not support OLCUpdateFeature capability, '
+ 'ignoring cache', name)
+ return False
+
+ defn = lyr.GetLayerDefn()
+ fields = [
+ { 'name': 'layername', 'typ': ogr.OFTString,
+ 'nullable': False, 'unique': True, 'width': 255 },
+ { 'name': 'last_updated', 'typ': ogr.OFTDateTime,
+ 'nullable': False },
+ { 'name': 'fingerprint', 'typ': ogr.OFTBinary,
+ 'nullable': False, 'width': 32 },
+ ]
+ m = len(fields)
+ n = defn.GetFieldCount()
+ if n < m:
+ # this is fatal, and `all(bs)` is False so we return False below
+ logging.warning('Layer cache "%s" has %d < %d fields', name, n, m)
+ elif n != m:
+ logging.warning('Layer cache "%s" has %d != %d fields', name, n, m)
+ bs = [ validateLayerCacheField(defn, i, **fld) for i,fld in enumerate(fields) ]
+ if not all(bs):
+ return False
+
+ n = defn.GetGeomFieldCount()
+ if n > 0:
+ geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName())
+ for i in range(n) ]
+ logging.warning('Layer cache "%s" has %d > 0 geometry field(s): %s',
+ name, n, ', '.join(geomFieldNames))
+
+ style = lyr.GetStyleTable()
+ if style is not None:
+ logging.warning('Layer cache "%s" has a style table "%s"',
+ name, style.GetLastStyleName())
+ return True
+
+def areSourceFilesNewer(layername : str,
+ sources : dict[str,Any],
+ lyrcache : ogr.Layer,
+ cachedir : Optional[Path] = None) -> bool:
+ """Return a boolean indicating whether the layer cache is up to date with
+ respect to the source files found on disk. That is, the last modification
+ and last changed time of each source file needs to be equal or lower than
+ the `last_updated` value found in the layer cache."""
+
+ source_paths = set()
+ for source in sources:
+ # the same source_path can be used multiple times, stat(2) only once
+ source_path = source['source']['path']
+ source_paths.add(source_path)
+ if len(source_paths) == 0:
+ return False
+
+ t = None
+ mtimes_ns = {}
+ for source_path in source_paths:
+ path = source_path if cachedir is None else str(cachedir.joinpath(source_path))
+ try:
+ st = os.stat(path)
+ if not S_ISREG(st.st_mode):
+ raise FileNotFoundError
+ mtimes_ns[source_path] = st.st_mtime_ns
+ # take max(mtime, ctime): if we lock source paths any update after
+ # aquiring the lock will yield a value larger than time.time_ns()
+ t2 = max(st.st_mtime_ns, st.st_ctime_ns)
+ if t is None or t < t2:
+ t = t2
+ except (OSError, ValueError):
+ #logging.warning('Could not stat(%s)', path)
+ return True
+ assert t is not None
+
+ attributeFilter = 'layername = ' + escape_literal_str(layername)
+ logging.debug('SetAttributeFilter("%s", "%s")', lyrcache.GetName(), attributeFilter)
+ lyrcache.SetAttributeFilter(attributeFilter)
+
+ feature = lyrcache.GetNextFeature()
+ if feature is None:
+ # not in cache
+ return True
+
+ if not feature.IsFieldSetAndNotNull(1):
+ ret = True
+ else:
+ # https://gdal.org/en/stable/api/python/vector_api.html#osgeo.ogr.Feature.GetFieldAsDateTime
+ # [ year, month, day, hour, minute, second, timezone flag ]
+ dt = feature.GetFieldAsDateTime(1)
+ if dt[6] == ogr.TZFLAG_UNKNOWN:
+ logging.warning('Datetime specified with unknown timezone in layer cache\'s '
+ 'field #%d "%s", assuming local time', 1,
+ feature.GetDefnRef().GetFieldDefn(1).GetName())
+ tz = None
+ elif dt[6] == ogr.TZFLAG_LOCALTIME:
+ tz = None
+ elif dt[6] == ogr.TZFLAG_UTC:
+ tz = UTC
+ else:
+ tz = timezone(offset=timedelta(seconds=(dt[6] - ogr.TZFLAG_UTC) * 900))
+ ms, s = modf(dt[5])
+ dt = datetime(
+ year=dt[0], # including century
+ month=dt[1], # 01 ≤ year ≤ 12
+ day=dt[2], # 01 ≤ day ≤ 31
+ hour=dt[3], # 00 ≤ hour ≤ 23
+ minute=dt[4], # 00 ≤ minute ≤ 59
+ second=int(s), # 00 ≤ second ≤ 59
+ microsecond=round(ms*1000000),
+ tzinfo=tz
+ )
+ fpr = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None
+ logging.debug('Found entry in layer cache for "%s", last_updated=%s, fingerprint=%s',
+ layername,
+ dt.isoformat(timespec='microseconds'),
+ fpr.hex() if fpr is not None else 'NULL')
+ ret = int(dt.timestamp() * 1000000.) * 1000 < t
+
+ if lyrcache.GetNextFeature() is not None:
+ raise RuntimeError(f'Duplicate key {layername}')
+
+ if not ret:
+ for source_path, mtime_ns in sorted(mtimes_ns.items()):
+ # XXX datetime.fromtimestamp() doesn't support nanosecond input
+ # https://github.com/python/cpython/issues/59648
+ mtime = (mtime_ns // 1000) / 1000000.
+ dt = datetime.fromtimestamp(mtime)
+ logging.info('Source file %s is unchanged (last modification time %s)',
+ source_path, dt.astimezone().isoformat(timespec='seconds'))
+ return ret
+
+def getLastMTimes(layerdefs : dict[str,Any], basedir : Optional[Path] = None) -> dict[str,int]:
+ """Return a directing mapping source paths to their last modification time
+ (as a timestamp in milliseconds)."""
+ ret = {}
+ for layerdef in layerdefs:
+ for source in layerdef['sources']:
+ source_path = source['source']['path']
+ if source_path in ret:
+ continue
+ path = source_path if basedir is None else str(basedir.joinpath(source_path))
+ try:
+ st = os.stat(path)
+ if not S_ISREG(st.st_mode):
+ raise FileNotFoundError
+ ret[source_path] = st.st_mtime_ns // 1000000
+ except (OSError, ValueError):
+ #logging.warning('Could not stat(%s)', path)
+ pass
+ return ret
+
+def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]:
+ """Place shared locks on each source path and return their respective file
+ descriptors. We could do that one layerdef at a time (one output layer at a
+ time) to reduce the time during which the sources prevented from being
+ updated/downloaded, but their is some value in having consistency across the
+ whole import process."""
+ umask = os.umask(0o002)
+ lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY)
+ try:
+ ret = {}
+ for layerdef in layerdefs:
+ for source in layerdef['sources']:
+ source_path = source['source']['path']
+ if source_path in ret:
+ continue
+ lockfile = getSourcePathLockFileName(source_path)
+ lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664,
+ dir_fd=lockdir_fd)
+ logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)',
+ source_path, lockfile)
+ flock(lock_fd, LOCK_SH)
+ ret[source_path] = lock_fd
+ return ret
+ finally:
+ try:
+ os.close(lockdir_fd)
+ except (OSError, ValueError):
+ logging.exception('Could not close lockdir')
+ os.umask(umask)
+
+def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None:
+ """Release shared locks on the source paths. Closed FDs are removed from
+ the dictionary in place."""
+ toremove = set()
+ for path, lock_fd in lock_fds.items():
+ try:
+ os.close(lock_fd)
+ except (OSError, ValueError):
+ logging.exception('Could not release lock for %s', path)
+ else:
+ logging.debug('Released lock for %s', path)
+ toremove.add(path)
+ for path in toremove:
+ lock_fds.pop(path)
+
+# pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements
+def main() -> NoReturn:
+ common.init_logger(app=os.path.basename(__file__), level=logging.INFO)
+
+ parser = argparse.ArgumentParser(description='Extract and import GIS layers.')
+ parser.add_argument('--cachedir', default=None,
+ help=f'cache directory (default: {os.curdir})')
+ parser.add_argument('--debug', action='count', default=0,
+ help=argparse.SUPPRESS)
+ parser.add_argument('--lockfile', default=None,
+ help='obtain an exclusive lock before processing')
+ parser.add_argument('--lockdir-sources', default=None,
+ help='optional directory for lock files to source paths')
+ parser.add_argument('--mvtdir', default=None,
+ help='optional directory for Mapbox Vector Tiles (MVT)')
+ parser.add_argument('--mvt-compress', default=False, action='store_true',
+ help='whether to compress Mapbox Vector Tiles (MVT) files')
+ parser.add_argument('--rasterdir', default=None,
+ help='optional directory for raster files')
+ parser.add_argument('--metadata-compress', default=False, action='store_true',
+ help='whether to compress metadata.json files')
+ parser.add_argument('--force', default=False, action='store_true',
+ help='import even if no new changes were detected')
+ parser.add_argument('groupname', nargs='*', help='group layer name(s) to process')
+ args = parser.parse_args()
+
+ if args.debug > 0: # pylint: disable=duplicate-code
+ logging.getLogger().setLevel(logging.DEBUG)
+ if args.debug > 1:
+ gdal.ConfigurePythonLogging(enable_debug=True)
+
+ config = common.parse_config(groupnames=None if args.groupname == [] else args.groupname)
+
+ # validate configuration
+ if 'dataset' not in config:
+ raise BadConfiguration('Configuration does not specify output dataset')
+
+ layers = config.get('layers', {})
+ validate_sources(layers)
+
+ # set global GDAL/OGR configuration options
+ for pszKey, pszValue in config.get('GDALconfig', {}).items():
+ logging.debug('gdal.SetConfigOption(%s, %s)', pszKey, pszValue)
+ gdal.SetConfigOption(pszKey, pszValue)
+
+ # get configured Spatial Reference System and extent
+ srs = getSRS(config.get('SRS', None))
+ extent = getExtent(config.get('extent', None), srs=srs)
+
+ if args.lockfile is not None:
+ # obtain an exclusive lock and don't release it until exiting the program
+ lock_fd = os.open(args.lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644)
+ logging.debug('flock("%s", LOCK_EX)', args.lockfile)
+ flock(lock_fd, LOCK_EX)
+
+ if args.mvtdir is not None:
+ args.mvtdir = Path(args.mvtdir)
+ if args.mvtdir == Path(): # make sure it's not curdir as we don't want to exchange it
+ raise RuntimeError('Invalid value for --mvtdir')
+ args.mvtdir.parent.mkdir(parents=True, exist_ok=True)
+
+ if args.cachedir is not None:
+ args.cachedir = Path(args.cachedir)
+ if args.lockdir_sources is None:
+ sourcePathLocks = None
+ else:
+ sourcePathLocks = lockSourcePaths(layerdefs=layers.values(),
+ lockdir=args.lockdir_sources)
+
+ # special handling for raster layers
+ if any(l.get('type') == 'raster' for l in layers.values()):
+ if not all(l.get('type') == 'raster' for l in layers.values()):
+ raise NotImplementedError('Mix of raster and vector layers is not supported')
+ if args.rasterdir is None:
+ raise RuntimeError('Missing required value for --rasterdir')
+ if len(layers) != 1:
+ raise RuntimeError('Raster layers need to be processed one at a time')
+ args.rasterdir = Path(args.rasterdir)
+ if args.rasterdir == Path(): # make sure it's not curdir as we don't want to exchange it
+ raise RuntimeError('Invalid value for --rasterdir')
+ args.rasterdir.parent.mkdir(parents=True, exist_ok=True)
+ last_modified = getLastMTimes(layerdefs=layers.values(), basedir=args.cachedir)
+ rv = 0
+ for layername, layerdef in layers.items():
+ try:
+ processRaster(layername, layerdef,
+ sources=parse_config_dl(config.get('downloads', [])),
+ license_info=config.get('license-info', {}),
+ last_modified=last_modified,
+ dst=args.rasterdir,
+ cachedir=args.cachedir,
+ extent=extent,
+ compress_metadata=args.metadata_compress)
+ except Exception: # pylint: disable=broad-exception-caught
+ rv = 1
+ traceback.print_exc()
+ sys.exit(rv)
+
+ # open output dataset (possibly create it first)
+ dso = openOutputDS(config['dataset'])
+
+ validate_schema(layers,
+ drvo=dso.GetDriver(),
+ lco_defaults=config['dataset'].get('create-layer-options', None))
+
+ # create all output layers before starting the transaction
+ for layername, layerdef in layers.items():
+ lyr = dso.GetLayerByName(layername)
+ if lyr is not None:
+ # TODO dso.DeleteLayer(layername) if --overwrite and
+ # dso.TestCapability(ogr.ODsCDeleteLayer)
+ # (Sets OVERWRITE=YES for PostgreSQL and GPKG.)
+ continue
+ if not dso.TestCapability(ogr.ODsCCreateLayer):
+ raise RuntimeError(f'Output driver {dso.GetDriver().ShortName} does not '
+ 'support layer creation')
+ createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None))
+
+ if (dso.TestCapability(ogr.ODsCTransactions) and
+ # we need SAVEPOINT support
+ dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')):
+ logging.debug('Starting transaction')
+ dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE
+ else:
+ logging.warning('Output driver %s does not support dataset transactions or SQL SAVEPOINTs',
+ dso.GetDriver().ShortName)
+ dsoTransaction = False
+
+ # validate layer cache
+ lyr_cache = config['dataset'].get('layercache', None)
+ if lyr_cache is None:
+ pass
+ elif validateCacheLayer(dso, lyr_cache):
+ lyr_cache = dso.GetLayerByName(lyr_cache)
+ else:
+ if not args.force:
+ logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache)
+ args.force = True
+ lyr_cache = None
+
+ rv = 0
+ try:
+ r = {}
+ n = 0
+ start = time_monotonic()
+ for layername, layerdef in layers.items():
+ r[layername] = r0 = processOutputLayer(dso, layername, layerdef,
+ srs=srs,
+ cachedir=args.cachedir,
+ extent=extent,
+ dsTransaction=dsoTransaction,
+ lyrcache=lyr_cache,
+ force=args.force)
+ n += 1
+ logging.info('Import result status for layer "%s": %s', layername, str(r0))
+ if r0 == ImportStatus.IMPORT_ERROR:
+ rv = 1
+ if dsoTransaction:
+ dsoTransaction = False
+ logging.debug('Rolling back transaction')
+ # no need to catch the exception here
+ if dso.CommitTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not rollback transaction')
+ break
+ elapsed = time_monotonic() - start
+ logging.info('Processed %d destination layers in %s', n, common.format_time(elapsed))
+
+ # get mtimes before releasing the source locks
+ last_modified = getLastMTimes(layerdefs=layers.values(), basedir=args.cachedir)
+
+ if sourcePathLocks is not None:
+ releaseSourcePathLocks(sourcePathLocks)
+
+ export_layers = { l:d for l,d in layers.items() if d.get('publish', None) is not None }
+ if args.mvtdir is None or any(r0 == ImportStatus.IMPORT_ERROR for r0 in r.values()):
+ pass
+ elif len(export_layers) == 0:
+ logging.warning('--mvtdir option used but no layer has a publication definition')
+ elif (all(r0 == ImportStatus.IMPORT_NOCHANGE for l,r0 in r.items() if l in export_layers)
+ and args.mvtdir.is_dir()):
+ logging.info('Skipping MVT export for group %s (no changes)',
+ ', '.join(args.groupname) if args.groupname is not None else '*')
+ else:
+ exportMVT(dso,
+ layers=export_layers,
+ sources=parse_config_dl(config.get('downloads', [])),
+ license_info=config.get('license-info', {}),
+ last_modified=last_modified,
+ dst=args.mvtdir,
+ default_options=config.get('vector-tiles', None),
+ compress=args.mvt_compress,
+ compress_metadata=args.metadata_compress)
+
+ if dsoTransaction:
+ dsoTransaction = False
+ logging.debug('Committing transaction')
+ if dso.CommitTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not commit transaction')
+ rv = 1
+
+ except Exception: # pylint: disable=broad-exception-caught
+ if dsoTransaction:
+ logging.exception('Exception occured within transaction, rolling back')
+ try:
+ if dso.RollbackTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not rollback transaction')
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not rollback transaction')
+ else:
+ traceback.print_exc()
+ sys.exit(1)
+
+ finally:
+ lyr_cache = None
+ dso = None
+ extent = None
+ srs = None
+ sys.exit(rv)
+
+gdal.UseExceptions()
+main()