diff options
Diffstat (limited to 'geodata-import')
| -rwxr-xr-x | geodata-import | 794 |
1 files changed, 794 insertions, 0 deletions
diff --git a/geodata-import b/geodata-import new file mode 100755 index 0000000..2f0f5b4 --- /dev/null +++ b/geodata-import @@ -0,0 +1,794 @@ +#!/usr/bin/python3 + +#---------------------------------------------------------------------- +# Backend utilities for the Klimatanalys Norr project (extract/import layers) +# Copyright © 2024-2025 Guilhem Moulin <info@guilhem.se> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +#---------------------------------------------------------------------- + +# pylint: disable=invalid-name, missing-module-docstring, fixme + +from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY +import os +from stat import S_ISREG +import sys +from fcntl import flock, LOCK_EX, LOCK_SH +import logging +import argparse +import re +from datetime import datetime, timedelta, timezone, UTC +from math import modf +from pathlib import Path +from time import monotonic as time_monotonic +from typing import Any, Optional, NoReturn +import traceback + +from osgeo import gdal, ogr, osr +from osgeo.gdalconst import ( + CE_None as GDAL_CE_None, + DCAP_DEFAULT_FIELDS as GDAL_DCAP_DEFAULT_FIELDS, + DCAP_NOTNULL_FIELDS as GDAL_DCAP_NOTNULL_FIELDS, + DCAP_UNIQUE_FIELDS as GDAL_DCAP_UNIQUE_FIELDS, +) +from osgeo import gdalconst + +import common +from common import ( + BadConfiguration, + parse_config_dl, + escape_identifier, + escape_literal_str, + getSourcePathLockFileName +) +from common_gdal import ( + gdalGetMetadataItem, + getSRS, + getExtent, + parseGeomType, + parseFieldType, + parseSubFieldType, + parseTimeZone +) +from import_source import ( + openOutputDS, + createOutputLayer, + validateOutputLayer, + importSources, + ImportStatus +) +from export_mvt import exportMVT +from export_raster import processRaster + +def setFieldIf(cond : bool, + attrName : str, + val : Any, + data : dict[str, Any], + fldName : str, + drvName : str, + log = logging.warning) -> None: + """Conditionally set a field""" + if cond: + data[attrName] = val + else: + if isinstance(val, str): + val2 = '"' + val + '"' + else: + val2 = str(val) + log('Ignoring %s=%s on field "%s" (not supported by %s driver)', + attrName, val2, fldName, drvName) + +# pylint: disable-next=too-many-branches +def validate_schema(layers : dict[str, Any], + drvo : Optional[gdal.Driver] = None, + lco_defaults : Optional[dict[str, str]] = None) -> None: + """Validate layer creation options and schema. The schema is + modified in place with the parsed result. + (We need the driver of the output dataset to determine capability on + constraints.)""" + + # list of capability flags supported by the CreateField() API + drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_CREATION_FIELD_DEFN_FLAGS) + drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else [] + drvoSupportsFieldComment = 'Comment' in drvoFieldDefnFlags + + # cache driver capabilities + drvoSupportsFieldWidthPrecision = 'WidthPrecision' in drvoFieldDefnFlags + drvoSupportsFieldNullable = ('Nullable' in drvoFieldDefnFlags and + gdalGetMetadataItem(drvo, GDAL_DCAP_NOTNULL_FIELDS)) + drvoSupportsFieldUnique = ('Unique' in drvoFieldDefnFlags and + gdalGetMetadataItem(drvo, GDAL_DCAP_UNIQUE_FIELDS)) + drvoSupportsFieldDefault = ('Default' in drvoFieldDefnFlags and + gdalGetMetadataItem(drvo, GDAL_DCAP_DEFAULT_FIELDS)) + drvoSupportsFieldAlternativeName = 'AlternativeName' in drvoFieldDefnFlags + + for layername, layerdef in layers.items(): + create = layerdef.get('create', None) + if create is None or len(create) < 1: + logging.warning('Layer "%s" has no creation schema', layername) + continue + + # prepend global layer creation options (dataset:create-layer-options) + # and build the option=value list + lco = create.get('options', None) + if lco_defaults is not None or lco is not None: + options = [] + if lco_defaults is not None: + options += [ k + '=' + str(v) for k, v in lco_defaults.items() ] + if lco is not None: + options += [ k + '=' + str(v) for k, v in lco.items() ] + create['options'] = options + + # parse geometry type + create['geometry-type'] = parseGeomType(create.get('geometry-type', None)) + + fields = create.get('fields', None) + if fields is None: + create['fields'] = [] + else: + fields_set = set() + for idx, fld_def in enumerate(fields): + fld_name = fld_def.get('name', None) + if fld_name is None or fld_name == '': + raise BadConfiguration(f'Field #{idx} has no name') + if fld_name in fields_set: + raise BadConfiguration(f'Duplicate field "{fld_name}"') + fields_set.add(fld_name) + + fld_def2 = { 'Name': fld_name } + for k, v in fld_def.items(): + k2 = k.lower() + if k2 == 'name': + pass + elif k2 in ('alternativename', 'alias'): + setFieldIf(drvoSupportsFieldAlternativeName, + 'AlternativeName', v, fld_def2, fld_name, drvo.ShortName, + log=logging.debug) + elif k2 == 'comment': + setFieldIf(drvoSupportsFieldComment, + 'Comment', v, fld_def2, fld_name, drvo.ShortName, + log=logging.debug) + + elif k2 == 'type': + fld_def2['Type'] = parseFieldType(v) + elif k2 == 'subtype': + fld_def2['SubType'] = parseSubFieldType(v) + elif k2 == 'tz': + fld_def2['TZFlag'] = parseTimeZone(v) + elif k2 == 'width' and v is not None and isinstance(v, int): + setFieldIf(drvoSupportsFieldWidthPrecision, + 'Width', v, fld_def2, fld_name, drvo.ShortName) + elif k2 == 'precision' and v is not None and isinstance(v, int): + setFieldIf(drvoSupportsFieldWidthPrecision, + 'Precision', v, fld_def2, fld_name, drvo.ShortName) + + # constraints + elif k2 == 'default': + setFieldIf(drvoSupportsFieldDefault, + 'Default', v, fld_def2, fld_name, drvo.ShortName) + elif k2 == 'nullable' and v is not None and isinstance(v, bool): + setFieldIf(drvoSupportsFieldNullable, + 'Nullable', v, fld_def2, fld_name, drvo.ShortName) + elif k2 == 'unique' and v is not None and isinstance(v, bool): + setFieldIf(drvoSupportsFieldUnique, + 'Unique', v, fld_def2, fld_name, drvo.ShortName) + else: + raise BadConfiguration(f'Field "{fld_name}" has unknown key "{k}"') + + fields[idx] = fld_def2 + +def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]): + """Setup output field mapping, modifying the sources dictionary in place.""" + fieldMap = {} + n = defn.GetFieldCount() + for i in range(n): + fld = defn.GetFieldDefn(i) + fldName = fld.GetName() + fieldMap[fldName] = i + + for source in sources: + source_import = source['import'] + + fieldMap2 = source_import.get('field-map', None) + if fieldMap2 is None: + fieldMap2 = fieldMap + else: + if isinstance(fieldMap2, list): + # convert list to identity dictionary + fieldMap2 = { fld: fld for fld in fieldMap2 } + + for ifld, ofld in fieldMap2.items(): + i = fieldMap.get(ofld, None) + if i is None: + raise RuntimeError(f'Ouput layer has no field named "{ofld}"') + fieldMap2[ifld] = i + source_import['field-map'] = fieldMap2 + + # validate field value mapping + valueMap = source_import.get('value-map', None) + if valueMap is not None: + for fldName, rules in valueMap.items(): + if rules is None: + continue + if not isinstance(rules, list): + rules = [rules] + for idx, rule in enumerate(rules): + if rule is None or not isinstance(rule, dict): + raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}') + if 'type' not in rule: + ruleType = rule['type'] = 'literal' + else: + ruleType = rule['type'] + if ('replace' not in rule or 'with' not in rule or len(rule) != 3 or + ruleType is None or ruleType not in ('literal', 'regex')): + raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}') + if ruleType == 'regex': + rule['replace'] = re.compile(rule['replace']) + rules[idx] = ( rule['replace'], rule['with'] ) + +def processOutputLayer(ds : gdal.Dataset, + layername : str, + layerdef : dict[str,Any], + srs : Optional[osr.SpatialReference] = None, + cachedir : Path|None = None, + extent : ogr.Geometry|None = None, + dsTransaction : bool = False, + lyrcache : ogr.Layer|None = None, + force : bool = False) -> ImportStatus: + """Process an output layer.""" + + logging.info('Processing output layer "%s"', layername) + lyr = ds.GetLayerByName(layername) + if lyr is None: + raise RuntimeError(f'Failed to create output layer "{layername}"??') + if not lyr.TestCapability(ogr.OLCSequentialWrite): + raise RuntimeError(f'Output layer "{layername}" has no working ' + 'CreateFeature() method') + + sources = layerdef['sources'] + if not (lyrcache is None or force or + areSourceFilesNewer(layername, sources=sources, + lyrcache=lyrcache, + cachedir=cachedir)): + logging.info('Output layer "%s" is up to date, skipping', layername) + return ImportStatus.IMPORT_NOCHANGE + + validateOutputLayer(lyr, srs=srs, options=layerdef['create']) + + description = layerdef.get('description', None) + if (description is not None and + lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): + logging.warning('Could not set description metadata') + + # setup output field mapping in the sources dictionary + setOutputFieldMap(lyr.GetLayerDefn(), sources) + + return importSources(lyr=lyr, sources=sources, + cachedir=cachedir, extent=extent, + dsoTransaction=dsTransaction, + lyrcache=lyrcache, + force=force, + cluster_geometry=layerdef.get('cluster-geometry', False)) + +def validate_sources(layers : dict[str, Any]) -> None: + """Mangle and validate layer sources and import definitions""" + toremove = set() + for layername, layerdefs in layers.items(): + sources = layerdefs.get('sources', None) + if sources is None or len(sources) < 1: + logging.warning('Output layer "%s" has no definition, skipping', layername) + toremove.add(layername) + continue + + for idx, layerdef in enumerate(sources): + importdef = layerdef.get('import', None) + if importdef is None: + raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' + 'has no import definition') + + sourcedef = layerdef.get('source', None) + unar = None if sourcedef is None else sourcedef.get('unar', None) + src = None if sourcedef is None else sourcedef.get('path', None) + + ds_srcpath = importdef.get('path', None) + if src is None and unar is None and ds_srcpath is not None: + # fallback to importe:path if there is no unarchiving recipe + src = ds_srcpath + if unar is not None and ds_srcpath is None: + raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' + 'has no import source path') + if src is None: + raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' + 'has no source path') + layerdef['source'] = { 'path': src, 'unar': unar } + + for layername in toremove: + layers.pop(layername) + +def validateLayerCacheField(defn : ogr.FeatureDefn, idx : int, + name : str, + typ : int, + subtyp : int = ogr.OFSTNone, + width : int = 0, + unique : Optional[bool] = None, + nullable : Optional[bool] = None) -> bool: + """Validate field #idx from the layer cache table.""" + n = defn.GetFieldCount() + if idx >= n: + return False + defn = defn.GetFieldDefn(idx) + + b = True + name2 = defn.GetName() + if name2 != name: + logging.warning('Layer cache\'s field #%d has name "%s" != "%s"', idx, name2, name) + b = False + + if nullable is not None and defn.IsNullable() != nullable: + # non-fatal + logging.warning('Layer cache\'s field #%d ("%s") %s nullable', + idx, name2, 'is' if defn.IsNullable() else 'isn\'t') + + if unique is not None and defn.IsUnique() != unique: + # non-fatal + logging.warning('Layer cache\'s field #%d ("%s") %s unique', + idx, name2, 'is' if defn.IsUnique() else 'isn\'t') + + typ2 = defn.GetType() + if typ2 != typ: + logging.warning('Layer cache\'s field #%d ("%s") has type %s != %s', idx, name2, + ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ)) + b = False + + subtyp2 = defn.GetSubType() + if subtyp2 != subtyp: + logging.warning('Layer cache\'s field #%d ("%s") has subtype %s != %s', idx, name2, + ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp)) + b = False + + width2 = defn.GetWidth() + if width2 != 0 and (width == 0 or width2 < width): + # non-fatal + logging.warning('Layer cache\'s field #%d ("%s") is too small (width %d < %d)', + idx, name2, width2, width) + return b + +def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool: + """Validate layer cache table.""" + drvName = ds.GetDriver().ShortName + if drvName != 'PostgreSQL': # we need hash_record_extended(), sha256() and ST_AsEWKB() + logging.warning('Unsupported cache layer for output driver %s', drvName) + return False + lyr = ds.GetLayerByName(name) + if lyr is None: + logging.warning('Table "%s" does not exist', name) + return False + + if not (lyr.TestCapability(ogr.OLCRandomWrite) and lyr.TestCapability(ogr.OLCUpdateFeature)): + logging.warning('Layer "%s" does not support OLCUpdateFeature capability, ' + 'ignoring cache', name) + return False + + defn = lyr.GetLayerDefn() + fields = [ + { 'name': 'layername', 'typ': ogr.OFTString, + 'nullable': False, 'unique': True, 'width': 255 }, + { 'name': 'last_updated', 'typ': ogr.OFTDateTime, + 'nullable': False }, + { 'name': 'fingerprint', 'typ': ogr.OFTBinary, + 'nullable': False, 'width': 32 }, + ] + m = len(fields) + n = defn.GetFieldCount() + if n < m: + # this is fatal, and `all(bs)` is False so we return False below + logging.warning('Layer cache "%s" has %d < %d fields', name, n, m) + elif n != m: + logging.warning('Layer cache "%s" has %d != %d fields', name, n, m) + bs = [ validateLayerCacheField(defn, i, **fld) for i,fld in enumerate(fields) ] + if not all(bs): + return False + + n = defn.GetGeomFieldCount() + if n > 0: + geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName()) + for i in range(n) ] + logging.warning('Layer cache "%s" has %d > 0 geometry field(s): %s', + name, n, ', '.join(geomFieldNames)) + + style = lyr.GetStyleTable() + if style is not None: + logging.warning('Layer cache "%s" has a style table "%s"', + name, style.GetLastStyleName()) + return True + +def areSourceFilesNewer(layername : str, + sources : dict[str,Any], + lyrcache : ogr.Layer, + cachedir : Optional[Path] = None) -> bool: + """Return a boolean indicating whether the layer cache is up to date with + respect to the source files found on disk. That is, the last modification + and last changed time of each source file needs to be equal or lower than + the `last_updated` value found in the layer cache.""" + + source_paths = set() + for source in sources: + # the same source_path can be used multiple times, stat(2) only once + source_path = source['source']['path'] + source_paths.add(source_path) + if len(source_paths) == 0: + return False + + t = None + mtimes_ns = {} + for source_path in source_paths: + path = source_path if cachedir is None else str(cachedir.joinpath(source_path)) + try: + st = os.stat(path) + if not S_ISREG(st.st_mode): + raise FileNotFoundError + mtimes_ns[source_path] = st.st_mtime_ns + # take max(mtime, ctime): if we lock source paths any update after + # aquiring the lock will yield a value larger than time.time_ns() + t2 = max(st.st_mtime_ns, st.st_ctime_ns) + if t is None or t < t2: + t = t2 + except (OSError, ValueError): + #logging.warning('Could not stat(%s)', path) + return True + assert t is not None + + attributeFilter = 'layername = ' + escape_literal_str(layername) + logging.debug('SetAttributeFilter("%s", "%s")', lyrcache.GetName(), attributeFilter) + lyrcache.SetAttributeFilter(attributeFilter) + + feature = lyrcache.GetNextFeature() + if feature is None: + # not in cache + return True + + if not feature.IsFieldSetAndNotNull(1): + ret = True + else: + # https://gdal.org/en/stable/api/python/vector_api.html#osgeo.ogr.Feature.GetFieldAsDateTime + # [ year, month, day, hour, minute, second, timezone flag ] + dt = feature.GetFieldAsDateTime(1) + if dt[6] == ogr.TZFLAG_UNKNOWN: + logging.warning('Datetime specified with unknown timezone in layer cache\'s ' + 'field #%d "%s", assuming local time', 1, + feature.GetDefnRef().GetFieldDefn(1).GetName()) + tz = None + elif dt[6] == ogr.TZFLAG_LOCALTIME: + tz = None + elif dt[6] == ogr.TZFLAG_UTC: + tz = UTC + else: + tz = timezone(offset=timedelta(seconds=(dt[6] - ogr.TZFLAG_UTC) * 900)) + ms, s = modf(dt[5]) + dt = datetime( + year=dt[0], # including century + month=dt[1], # 01 ≤ year ≤ 12 + day=dt[2], # 01 ≤ day ≤ 31 + hour=dt[3], # 00 ≤ hour ≤ 23 + minute=dt[4], # 00 ≤ minute ≤ 59 + second=int(s), # 00 ≤ second ≤ 59 + microsecond=round(ms*1000000), + tzinfo=tz + ) + fpr = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None + logging.debug('Found entry in layer cache for "%s", last_updated=%s, fingerprint=%s', + layername, + dt.isoformat(timespec='microseconds'), + fpr.hex() if fpr is not None else 'NULL') + ret = int(dt.timestamp() * 1000000.) * 1000 < t + + if lyrcache.GetNextFeature() is not None: + raise RuntimeError(f'Duplicate key {layername}') + + if not ret: + for source_path, mtime_ns in sorted(mtimes_ns.items()): + # XXX datetime.fromtimestamp() doesn't support nanosecond input + # https://github.com/python/cpython/issues/59648 + mtime = (mtime_ns // 1000) / 1000000. + dt = datetime.fromtimestamp(mtime) + logging.info('Source file %s is unchanged (last modification time %s)', + source_path, dt.astimezone().isoformat(timespec='seconds')) + return ret + +def getLastMTimes(layerdefs : dict[str,Any], basedir : Optional[Path] = None) -> dict[str,int]: + """Return a directing mapping source paths to their last modification time + (as a timestamp in milliseconds).""" + ret = {} + for layerdef in layerdefs: + for source in layerdef['sources']: + source_path = source['source']['path'] + if source_path in ret: + continue + path = source_path if basedir is None else str(basedir.joinpath(source_path)) + try: + st = os.stat(path) + if not S_ISREG(st.st_mode): + raise FileNotFoundError + ret[source_path] = st.st_mtime_ns // 1000000 + except (OSError, ValueError): + #logging.warning('Could not stat(%s)', path) + pass + return ret + +def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]: + """Place shared locks on each source path and return their respective file + descriptors. We could do that one layerdef at a time (one output layer at a + time) to reduce the time during which the sources prevented from being + updated/downloaded, but their is some value in having consistency across the + whole import process.""" + umask = os.umask(0o002) + lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY) + try: + ret = {} + for layerdef in layerdefs: + for source in layerdef['sources']: + source_path = source['source']['path'] + if source_path in ret: + continue + lockfile = getSourcePathLockFileName(source_path) + lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664, + dir_fd=lockdir_fd) + logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)', + source_path, lockfile) + flock(lock_fd, LOCK_SH) + ret[source_path] = lock_fd + return ret + finally: + try: + os.close(lockdir_fd) + except (OSError, ValueError): + logging.exception('Could not close lockdir') + os.umask(umask) + +def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None: + """Release shared locks on the source paths. Closed FDs are removed from + the dictionary in place.""" + toremove = set() + for path, lock_fd in lock_fds.items(): + try: + os.close(lock_fd) + except (OSError, ValueError): + logging.exception('Could not release lock for %s', path) + else: + logging.debug('Released lock for %s', path) + toremove.add(path) + for path in toremove: + lock_fds.pop(path) + +# pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements +def main() -> NoReturn: + common.init_logger(app=os.path.basename(__file__), level=logging.INFO) + + parser = argparse.ArgumentParser(description='Extract and import GIS layers.') + parser.add_argument('--cachedir', default=None, + help=f'cache directory (default: {os.curdir})') + parser.add_argument('--debug', action='count', default=0, + help=argparse.SUPPRESS) + parser.add_argument('--lockfile', default=None, + help='obtain an exclusive lock before processing') + parser.add_argument('--lockdir-sources', default=None, + help='optional directory for lock files to source paths') + parser.add_argument('--mvtdir', default=None, + help='optional directory for Mapbox Vector Tiles (MVT)') + parser.add_argument('--mvt-compress', default=False, action='store_true', + help='whether to compress Mapbox Vector Tiles (MVT) files') + parser.add_argument('--rasterdir', default=None, + help='optional directory for raster files') + parser.add_argument('--metadata-compress', default=False, action='store_true', + help='whether to compress metadata.json files') + parser.add_argument('--force', default=False, action='store_true', + help='import even if no new changes were detected') + parser.add_argument('groupname', nargs='*', help='group layer name(s) to process') + args = parser.parse_args() + + if args.debug > 0: # pylint: disable=duplicate-code + logging.getLogger().setLevel(logging.DEBUG) + if args.debug > 1: + gdal.ConfigurePythonLogging(enable_debug=True) + + config = common.parse_config(groupnames=None if args.groupname == [] else args.groupname) + + # validate configuration + if 'dataset' not in config: + raise BadConfiguration('Configuration does not specify output dataset') + + layers = config.get('layers', {}) + validate_sources(layers) + + # set global GDAL/OGR configuration options + for pszKey, pszValue in config.get('GDALconfig', {}).items(): + logging.debug('gdal.SetConfigOption(%s, %s)', pszKey, pszValue) + gdal.SetConfigOption(pszKey, pszValue) + + # get configured Spatial Reference System and extent + srs = getSRS(config.get('SRS', None)) + extent = getExtent(config.get('extent', None), srs=srs) + + if args.lockfile is not None: + # obtain an exclusive lock and don't release it until exiting the program + lock_fd = os.open(args.lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644) + logging.debug('flock("%s", LOCK_EX)', args.lockfile) + flock(lock_fd, LOCK_EX) + + if args.mvtdir is not None: + args.mvtdir = Path(args.mvtdir) + if args.mvtdir == Path(): # make sure it's not curdir as we don't want to exchange it + raise RuntimeError('Invalid value for --mvtdir') + args.mvtdir.parent.mkdir(parents=True, exist_ok=True) + + if args.cachedir is not None: + args.cachedir = Path(args.cachedir) + if args.lockdir_sources is None: + sourcePathLocks = None + else: + sourcePathLocks = lockSourcePaths(layerdefs=layers.values(), + lockdir=args.lockdir_sources) + + # special handling for raster layers + if any(l.get('type') == 'raster' for l in layers.values()): + if not all(l.get('type') == 'raster' for l in layers.values()): + raise NotImplementedError('Mix of raster and vector layers is not supported') + if args.rasterdir is None: + raise RuntimeError('Missing required value for --rasterdir') + if len(layers) != 1: + raise RuntimeError('Raster layers need to be processed one at a time') + args.rasterdir = Path(args.rasterdir) + if args.rasterdir == Path(): # make sure it's not curdir as we don't want to exchange it + raise RuntimeError('Invalid value for --rasterdir') + args.rasterdir.parent.mkdir(parents=True, exist_ok=True) + last_modified = getLastMTimes(layerdefs=layers.values(), basedir=args.cachedir) + rv = 0 + for layername, layerdef in layers.items(): + try: + processRaster(layername, layerdef, + sources=parse_config_dl(config.get('downloads', [])), + license_info=config.get('license-info', {}), + last_modified=last_modified, + dst=args.rasterdir, + cachedir=args.cachedir, + extent=extent, + compress_metadata=args.metadata_compress) + except Exception: # pylint: disable=broad-exception-caught + rv = 1 + traceback.print_exc() + sys.exit(rv) + + # open output dataset (possibly create it first) + dso = openOutputDS(config['dataset']) + + validate_schema(layers, + drvo=dso.GetDriver(), + lco_defaults=config['dataset'].get('create-layer-options', None)) + + # create all output layers before starting the transaction + for layername, layerdef in layers.items(): + lyr = dso.GetLayerByName(layername) + if lyr is not None: + # TODO dso.DeleteLayer(layername) if --overwrite and + # dso.TestCapability(ogr.ODsCDeleteLayer) + # (Sets OVERWRITE=YES for PostgreSQL and GPKG.) + continue + if not dso.TestCapability(ogr.ODsCCreateLayer): + raise RuntimeError(f'Output driver {dso.GetDriver().ShortName} does not ' + 'support layer creation') + createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None)) + + if (dso.TestCapability(ogr.ODsCTransactions) and + # we need SAVEPOINT support + dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')): + logging.debug('Starting transaction') + dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE + else: + logging.warning('Output driver %s does not support dataset transactions or SQL SAVEPOINTs', + dso.GetDriver().ShortName) + dsoTransaction = False + + # validate layer cache + lyr_cache = config['dataset'].get('layercache', None) + if lyr_cache is None: + pass + elif validateCacheLayer(dso, lyr_cache): + lyr_cache = dso.GetLayerByName(lyr_cache) + else: + if not args.force: + logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache) + args.force = True + lyr_cache = None + + rv = 0 + try: + r = {} + n = 0 + start = time_monotonic() + for layername, layerdef in layers.items(): + r[layername] = r0 = processOutputLayer(dso, layername, layerdef, + srs=srs, + cachedir=args.cachedir, + extent=extent, + dsTransaction=dsoTransaction, + lyrcache=lyr_cache, + force=args.force) + n += 1 + logging.info('Import result status for layer "%s": %s', layername, str(r0)) + if r0 == ImportStatus.IMPORT_ERROR: + rv = 1 + if dsoTransaction: + dsoTransaction = False + logging.debug('Rolling back transaction') + # no need to catch the exception here + if dso.CommitTransaction() != ogr.OGRERR_NONE: + logging.error('Could not rollback transaction') + break + elapsed = time_monotonic() - start + logging.info('Processed %d destination layers in %s', n, common.format_time(elapsed)) + + # get mtimes before releasing the source locks + last_modified = getLastMTimes(layerdefs=layers.values(), basedir=args.cachedir) + + if sourcePathLocks is not None: + releaseSourcePathLocks(sourcePathLocks) + + export_layers = { l:d for l,d in layers.items() if d.get('publish', None) is not None } + if args.mvtdir is None or any(r0 == ImportStatus.IMPORT_ERROR for r0 in r.values()): + pass + elif len(export_layers) == 0: + logging.warning('--mvtdir option used but no layer has a publication definition') + elif (all(r0 == ImportStatus.IMPORT_NOCHANGE for l,r0 in r.items() if l in export_layers) + and args.mvtdir.is_dir()): + logging.info('Skipping MVT export for group %s (no changes)', + ', '.join(args.groupname) if args.groupname is not None else '*') + else: + exportMVT(dso, + layers=export_layers, + sources=parse_config_dl(config.get('downloads', [])), + license_info=config.get('license-info', {}), + last_modified=last_modified, + dst=args.mvtdir, + default_options=config.get('vector-tiles', None), + compress=args.mvt_compress, + compress_metadata=args.metadata_compress) + + if dsoTransaction: + dsoTransaction = False + logging.debug('Committing transaction') + if dso.CommitTransaction() != ogr.OGRERR_NONE: + logging.error('Could not commit transaction') + rv = 1 + + except Exception: # pylint: disable=broad-exception-caught + if dsoTransaction: + logging.exception('Exception occured within transaction, rolling back') + try: + if dso.RollbackTransaction() != ogr.OGRERR_NONE: + logging.error('Could not rollback transaction') + except Exception: # pylint: disable=broad-exception-caught + logging.exception('Could not rollback transaction') + else: + traceback.print_exc() + sys.exit(1) + + finally: + lyr_cache = None + dso = None + extent = None + srs = None + sys.exit(rv) + +gdal.UseExceptions() +main() |
