From ca91a579770c89d25aefae220079bf336fa88dc9 Mon Sep 17 00:00:00 2001 From: Guilhem Moulin Date: Fri, 6 Mar 2026 10:52:43 +0100 Subject: Rename "webmap" to the less generic "geodata". The database has uses beyond the webmap. --- webmap-import | 794 ---------------------------------------------------------- 1 file changed, 794 deletions(-) delete mode 100755 webmap-import (limited to 'webmap-import') diff --git a/webmap-import b/webmap-import deleted file mode 100755 index 2f0f5b4..0000000 --- a/webmap-import +++ /dev/null @@ -1,794 +0,0 @@ -#!/usr/bin/python3 - -#---------------------------------------------------------------------- -# Backend utilities for the Klimatanalys Norr project (extract/import layers) -# Copyright © 2024-2025 Guilhem Moulin -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -#---------------------------------------------------------------------- - -# pylint: disable=invalid-name, missing-module-docstring, fixme - -from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY -import os -from stat import S_ISREG -import sys -from fcntl import flock, LOCK_EX, LOCK_SH -import logging -import argparse -import re -from datetime import datetime, timedelta, timezone, UTC -from math import modf -from pathlib import Path -from time import monotonic as time_monotonic -from typing import Any, Optional, NoReturn -import traceback - -from osgeo import gdal, ogr, osr -from osgeo.gdalconst import ( - CE_None as GDAL_CE_None, - DCAP_DEFAULT_FIELDS as GDAL_DCAP_DEFAULT_FIELDS, - DCAP_NOTNULL_FIELDS as GDAL_DCAP_NOTNULL_FIELDS, - DCAP_UNIQUE_FIELDS as GDAL_DCAP_UNIQUE_FIELDS, -) -from osgeo import gdalconst - -import common -from common import ( - BadConfiguration, - parse_config_dl, - escape_identifier, - escape_literal_str, - getSourcePathLockFileName -) -from common_gdal import ( - gdalGetMetadataItem, - getSRS, - getExtent, - parseGeomType, - parseFieldType, - parseSubFieldType, - parseTimeZone -) -from import_source import ( - openOutputDS, - createOutputLayer, - validateOutputLayer, - importSources, - ImportStatus -) -from export_mvt import exportMVT -from export_raster import processRaster - -def setFieldIf(cond : bool, - attrName : str, - val : Any, - data : dict[str, Any], - fldName : str, - drvName : str, - log = logging.warning) -> None: - """Conditionally set a field""" - if cond: - data[attrName] = val - else: - if isinstance(val, str): - val2 = '"' + val + '"' - else: - val2 = str(val) - log('Ignoring %s=%s on field "%s" (not supported by %s driver)', - attrName, val2, fldName, drvName) - -# pylint: disable-next=too-many-branches -def validate_schema(layers : dict[str, Any], - drvo : Optional[gdal.Driver] = None, - lco_defaults : Optional[dict[str, str]] = None) -> None: - """Validate layer creation options and schema. The schema is - modified in place with the parsed result. - (We need the driver of the output dataset to determine capability on - constraints.)""" - - # list of capability flags supported by the CreateField() API - drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_CREATION_FIELD_DEFN_FLAGS) - drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else [] - drvoSupportsFieldComment = 'Comment' in drvoFieldDefnFlags - - # cache driver capabilities - drvoSupportsFieldWidthPrecision = 'WidthPrecision' in drvoFieldDefnFlags - drvoSupportsFieldNullable = ('Nullable' in drvoFieldDefnFlags and - gdalGetMetadataItem(drvo, GDAL_DCAP_NOTNULL_FIELDS)) - drvoSupportsFieldUnique = ('Unique' in drvoFieldDefnFlags and - gdalGetMetadataItem(drvo, GDAL_DCAP_UNIQUE_FIELDS)) - drvoSupportsFieldDefault = ('Default' in drvoFieldDefnFlags and - gdalGetMetadataItem(drvo, GDAL_DCAP_DEFAULT_FIELDS)) - drvoSupportsFieldAlternativeName = 'AlternativeName' in drvoFieldDefnFlags - - for layername, layerdef in layers.items(): - create = layerdef.get('create', None) - if create is None or len(create) < 1: - logging.warning('Layer "%s" has no creation schema', layername) - continue - - # prepend global layer creation options (dataset:create-layer-options) - # and build the option=value list - lco = create.get('options', None) - if lco_defaults is not None or lco is not None: - options = [] - if lco_defaults is not None: - options += [ k + '=' + str(v) for k, v in lco_defaults.items() ] - if lco is not None: - options += [ k + '=' + str(v) for k, v in lco.items() ] - create['options'] = options - - # parse geometry type - create['geometry-type'] = parseGeomType(create.get('geometry-type', None)) - - fields = create.get('fields', None) - if fields is None: - create['fields'] = [] - else: - fields_set = set() - for idx, fld_def in enumerate(fields): - fld_name = fld_def.get('name', None) - if fld_name is None or fld_name == '': - raise BadConfiguration(f'Field #{idx} has no name') - if fld_name in fields_set: - raise BadConfiguration(f'Duplicate field "{fld_name}"') - fields_set.add(fld_name) - - fld_def2 = { 'Name': fld_name } - for k, v in fld_def.items(): - k2 = k.lower() - if k2 == 'name': - pass - elif k2 in ('alternativename', 'alias'): - setFieldIf(drvoSupportsFieldAlternativeName, - 'AlternativeName', v, fld_def2, fld_name, drvo.ShortName, - log=logging.debug) - elif k2 == 'comment': - setFieldIf(drvoSupportsFieldComment, - 'Comment', v, fld_def2, fld_name, drvo.ShortName, - log=logging.debug) - - elif k2 == 'type': - fld_def2['Type'] = parseFieldType(v) - elif k2 == 'subtype': - fld_def2['SubType'] = parseSubFieldType(v) - elif k2 == 'tz': - fld_def2['TZFlag'] = parseTimeZone(v) - elif k2 == 'width' and v is not None and isinstance(v, int): - setFieldIf(drvoSupportsFieldWidthPrecision, - 'Width', v, fld_def2, fld_name, drvo.ShortName) - elif k2 == 'precision' and v is not None and isinstance(v, int): - setFieldIf(drvoSupportsFieldWidthPrecision, - 'Precision', v, fld_def2, fld_name, drvo.ShortName) - - # constraints - elif k2 == 'default': - setFieldIf(drvoSupportsFieldDefault, - 'Default', v, fld_def2, fld_name, drvo.ShortName) - elif k2 == 'nullable' and v is not None and isinstance(v, bool): - setFieldIf(drvoSupportsFieldNullable, - 'Nullable', v, fld_def2, fld_name, drvo.ShortName) - elif k2 == 'unique' and v is not None and isinstance(v, bool): - setFieldIf(drvoSupportsFieldUnique, - 'Unique', v, fld_def2, fld_name, drvo.ShortName) - else: - raise BadConfiguration(f'Field "{fld_name}" has unknown key "{k}"') - - fields[idx] = fld_def2 - -def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]): - """Setup output field mapping, modifying the sources dictionary in place.""" - fieldMap = {} - n = defn.GetFieldCount() - for i in range(n): - fld = defn.GetFieldDefn(i) - fldName = fld.GetName() - fieldMap[fldName] = i - - for source in sources: - source_import = source['import'] - - fieldMap2 = source_import.get('field-map', None) - if fieldMap2 is None: - fieldMap2 = fieldMap - else: - if isinstance(fieldMap2, list): - # convert list to identity dictionary - fieldMap2 = { fld: fld for fld in fieldMap2 } - - for ifld, ofld in fieldMap2.items(): - i = fieldMap.get(ofld, None) - if i is None: - raise RuntimeError(f'Ouput layer has no field named "{ofld}"') - fieldMap2[ifld] = i - source_import['field-map'] = fieldMap2 - - # validate field value mapping - valueMap = source_import.get('value-map', None) - if valueMap is not None: - for fldName, rules in valueMap.items(): - if rules is None: - continue - if not isinstance(rules, list): - rules = [rules] - for idx, rule in enumerate(rules): - if rule is None or not isinstance(rule, dict): - raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}') - if 'type' not in rule: - ruleType = rule['type'] = 'literal' - else: - ruleType = rule['type'] - if ('replace' not in rule or 'with' not in rule or len(rule) != 3 or - ruleType is None or ruleType not in ('literal', 'regex')): - raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}') - if ruleType == 'regex': - rule['replace'] = re.compile(rule['replace']) - rules[idx] = ( rule['replace'], rule['with'] ) - -def processOutputLayer(ds : gdal.Dataset, - layername : str, - layerdef : dict[str,Any], - srs : Optional[osr.SpatialReference] = None, - cachedir : Path|None = None, - extent : ogr.Geometry|None = None, - dsTransaction : bool = False, - lyrcache : ogr.Layer|None = None, - force : bool = False) -> ImportStatus: - """Process an output layer.""" - - logging.info('Processing output layer "%s"', layername) - lyr = ds.GetLayerByName(layername) - if lyr is None: - raise RuntimeError(f'Failed to create output layer "{layername}"??') - if not lyr.TestCapability(ogr.OLCSequentialWrite): - raise RuntimeError(f'Output layer "{layername}" has no working ' - 'CreateFeature() method') - - sources = layerdef['sources'] - if not (lyrcache is None or force or - areSourceFilesNewer(layername, sources=sources, - lyrcache=lyrcache, - cachedir=cachedir)): - logging.info('Output layer "%s" is up to date, skipping', layername) - return ImportStatus.IMPORT_NOCHANGE - - validateOutputLayer(lyr, srs=srs, options=layerdef['create']) - - description = layerdef.get('description', None) - if (description is not None and - lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): - logging.warning('Could not set description metadata') - - # setup output field mapping in the sources dictionary - setOutputFieldMap(lyr.GetLayerDefn(), sources) - - return importSources(lyr=lyr, sources=sources, - cachedir=cachedir, extent=extent, - dsoTransaction=dsTransaction, - lyrcache=lyrcache, - force=force, - cluster_geometry=layerdef.get('cluster-geometry', False)) - -def validate_sources(layers : dict[str, Any]) -> None: - """Mangle and validate layer sources and import definitions""" - toremove = set() - for layername, layerdefs in layers.items(): - sources = layerdefs.get('sources', None) - if sources is None or len(sources) < 1: - logging.warning('Output layer "%s" has no definition, skipping', layername) - toremove.add(layername) - continue - - for idx, layerdef in enumerate(sources): - importdef = layerdef.get('import', None) - if importdef is None: - raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' - 'has no import definition') - - sourcedef = layerdef.get('source', None) - unar = None if sourcedef is None else sourcedef.get('unar', None) - src = None if sourcedef is None else sourcedef.get('path', None) - - ds_srcpath = importdef.get('path', None) - if src is None and unar is None and ds_srcpath is not None: - # fallback to importe:path if there is no unarchiving recipe - src = ds_srcpath - if unar is not None and ds_srcpath is None: - raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' - 'has no import source path') - if src is None: - raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' - 'has no source path') - layerdef['source'] = { 'path': src, 'unar': unar } - - for layername in toremove: - layers.pop(layername) - -def validateLayerCacheField(defn : ogr.FeatureDefn, idx : int, - name : str, - typ : int, - subtyp : int = ogr.OFSTNone, - width : int = 0, - unique : Optional[bool] = None, - nullable : Optional[bool] = None) -> bool: - """Validate field #idx from the layer cache table.""" - n = defn.GetFieldCount() - if idx >= n: - return False - defn = defn.GetFieldDefn(idx) - - b = True - name2 = defn.GetName() - if name2 != name: - logging.warning('Layer cache\'s field #%d has name "%s" != "%s"', idx, name2, name) - b = False - - if nullable is not None and defn.IsNullable() != nullable: - # non-fatal - logging.warning('Layer cache\'s field #%d ("%s") %s nullable', - idx, name2, 'is' if defn.IsNullable() else 'isn\'t') - - if unique is not None and defn.IsUnique() != unique: - # non-fatal - logging.warning('Layer cache\'s field #%d ("%s") %s unique', - idx, name2, 'is' if defn.IsUnique() else 'isn\'t') - - typ2 = defn.GetType() - if typ2 != typ: - logging.warning('Layer cache\'s field #%d ("%s") has type %s != %s', idx, name2, - ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ)) - b = False - - subtyp2 = defn.GetSubType() - if subtyp2 != subtyp: - logging.warning('Layer cache\'s field #%d ("%s") has subtype %s != %s', idx, name2, - ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp)) - b = False - - width2 = defn.GetWidth() - if width2 != 0 and (width == 0 or width2 < width): - # non-fatal - logging.warning('Layer cache\'s field #%d ("%s") is too small (width %d < %d)', - idx, name2, width2, width) - return b - -def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool: - """Validate layer cache table.""" - drvName = ds.GetDriver().ShortName - if drvName != 'PostgreSQL': # we need hash_record_extended(), sha256() and ST_AsEWKB() - logging.warning('Unsupported cache layer for output driver %s', drvName) - return False - lyr = ds.GetLayerByName(name) - if lyr is None: - logging.warning('Table "%s" does not exist', name) - return False - - if not (lyr.TestCapability(ogr.OLCRandomWrite) and lyr.TestCapability(ogr.OLCUpdateFeature)): - logging.warning('Layer "%s" does not support OLCUpdateFeature capability, ' - 'ignoring cache', name) - return False - - defn = lyr.GetLayerDefn() - fields = [ - { 'name': 'layername', 'typ': ogr.OFTString, - 'nullable': False, 'unique': True, 'width': 255 }, - { 'name': 'last_updated', 'typ': ogr.OFTDateTime, - 'nullable': False }, - { 'name': 'fingerprint', 'typ': ogr.OFTBinary, - 'nullable': False, 'width': 32 }, - ] - m = len(fields) - n = defn.GetFieldCount() - if n < m: - # this is fatal, and `all(bs)` is False so we return False below - logging.warning('Layer cache "%s" has %d < %d fields', name, n, m) - elif n != m: - logging.warning('Layer cache "%s" has %d != %d fields', name, n, m) - bs = [ validateLayerCacheField(defn, i, **fld) for i,fld in enumerate(fields) ] - if not all(bs): - return False - - n = defn.GetGeomFieldCount() - if n > 0: - geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName()) - for i in range(n) ] - logging.warning('Layer cache "%s" has %d > 0 geometry field(s): %s', - name, n, ', '.join(geomFieldNames)) - - style = lyr.GetStyleTable() - if style is not None: - logging.warning('Layer cache "%s" has a style table "%s"', - name, style.GetLastStyleName()) - return True - -def areSourceFilesNewer(layername : str, - sources : dict[str,Any], - lyrcache : ogr.Layer, - cachedir : Optional[Path] = None) -> bool: - """Return a boolean indicating whether the layer cache is up to date with - respect to the source files found on disk. That is, the last modification - and last changed time of each source file needs to be equal or lower than - the `last_updated` value found in the layer cache.""" - - source_paths = set() - for source in sources: - # the same source_path can be used multiple times, stat(2) only once - source_path = source['source']['path'] - source_paths.add(source_path) - if len(source_paths) == 0: - return False - - t = None - mtimes_ns = {} - for source_path in source_paths: - path = source_path if cachedir is None else str(cachedir.joinpath(source_path)) - try: - st = os.stat(path) - if not S_ISREG(st.st_mode): - raise FileNotFoundError - mtimes_ns[source_path] = st.st_mtime_ns - # take max(mtime, ctime): if we lock source paths any update after - # aquiring the lock will yield a value larger than time.time_ns() - t2 = max(st.st_mtime_ns, st.st_ctime_ns) - if t is None or t < t2: - t = t2 - except (OSError, ValueError): - #logging.warning('Could not stat(%s)', path) - return True - assert t is not None - - attributeFilter = 'layername = ' + escape_literal_str(layername) - logging.debug('SetAttributeFilter("%s", "%s")', lyrcache.GetName(), attributeFilter) - lyrcache.SetAttributeFilter(attributeFilter) - - feature = lyrcache.GetNextFeature() - if feature is None: - # not in cache - return True - - if not feature.IsFieldSetAndNotNull(1): - ret = True - else: - # https://gdal.org/en/stable/api/python/vector_api.html#osgeo.ogr.Feature.GetFieldAsDateTime - # [ year, month, day, hour, minute, second, timezone flag ] - dt = feature.GetFieldAsDateTime(1) - if dt[6] == ogr.TZFLAG_UNKNOWN: - logging.warning('Datetime specified with unknown timezone in layer cache\'s ' - 'field #%d "%s", assuming local time', 1, - feature.GetDefnRef().GetFieldDefn(1).GetName()) - tz = None - elif dt[6] == ogr.TZFLAG_LOCALTIME: - tz = None - elif dt[6] == ogr.TZFLAG_UTC: - tz = UTC - else: - tz = timezone(offset=timedelta(seconds=(dt[6] - ogr.TZFLAG_UTC) * 900)) - ms, s = modf(dt[5]) - dt = datetime( - year=dt[0], # including century - month=dt[1], # 01 ≤ year ≤ 12 - day=dt[2], # 01 ≤ day ≤ 31 - hour=dt[3], # 00 ≤ hour ≤ 23 - minute=dt[4], # 00 ≤ minute ≤ 59 - second=int(s), # 00 ≤ second ≤ 59 - microsecond=round(ms*1000000), - tzinfo=tz - ) - fpr = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None - logging.debug('Found entry in layer cache for "%s", last_updated=%s, fingerprint=%s', - layername, - dt.isoformat(timespec='microseconds'), - fpr.hex() if fpr is not None else 'NULL') - ret = int(dt.timestamp() * 1000000.) * 1000 < t - - if lyrcache.GetNextFeature() is not None: - raise RuntimeError(f'Duplicate key {layername}') - - if not ret: - for source_path, mtime_ns in sorted(mtimes_ns.items()): - # XXX datetime.fromtimestamp() doesn't support nanosecond input - # https://github.com/python/cpython/issues/59648 - mtime = (mtime_ns // 1000) / 1000000. - dt = datetime.fromtimestamp(mtime) - logging.info('Source file %s is unchanged (last modification time %s)', - source_path, dt.astimezone().isoformat(timespec='seconds')) - return ret - -def getLastMTimes(layerdefs : dict[str,Any], basedir : Optional[Path] = None) -> dict[str,int]: - """Return a directing mapping source paths to their last modification time - (as a timestamp in milliseconds).""" - ret = {} - for layerdef in layerdefs: - for source in layerdef['sources']: - source_path = source['source']['path'] - if source_path in ret: - continue - path = source_path if basedir is None else str(basedir.joinpath(source_path)) - try: - st = os.stat(path) - if not S_ISREG(st.st_mode): - raise FileNotFoundError - ret[source_path] = st.st_mtime_ns // 1000000 - except (OSError, ValueError): - #logging.warning('Could not stat(%s)', path) - pass - return ret - -def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]: - """Place shared locks on each source path and return their respective file - descriptors. We could do that one layerdef at a time (one output layer at a - time) to reduce the time during which the sources prevented from being - updated/downloaded, but their is some value in having consistency across the - whole import process.""" - umask = os.umask(0o002) - lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY) - try: - ret = {} - for layerdef in layerdefs: - for source in layerdef['sources']: - source_path = source['source']['path'] - if source_path in ret: - continue - lockfile = getSourcePathLockFileName(source_path) - lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664, - dir_fd=lockdir_fd) - logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)', - source_path, lockfile) - flock(lock_fd, LOCK_SH) - ret[source_path] = lock_fd - return ret - finally: - try: - os.close(lockdir_fd) - except (OSError, ValueError): - logging.exception('Could not close lockdir') - os.umask(umask) - -def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None: - """Release shared locks on the source paths. Closed FDs are removed from - the dictionary in place.""" - toremove = set() - for path, lock_fd in lock_fds.items(): - try: - os.close(lock_fd) - except (OSError, ValueError): - logging.exception('Could not release lock for %s', path) - else: - logging.debug('Released lock for %s', path) - toremove.add(path) - for path in toremove: - lock_fds.pop(path) - -# pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements -def main() -> NoReturn: - common.init_logger(app=os.path.basename(__file__), level=logging.INFO) - - parser = argparse.ArgumentParser(description='Extract and import GIS layers.') - parser.add_argument('--cachedir', default=None, - help=f'cache directory (default: {os.curdir})') - parser.add_argument('--debug', action='count', default=0, - help=argparse.SUPPRESS) - parser.add_argument('--lockfile', default=None, - help='obtain an exclusive lock before processing') - parser.add_argument('--lockdir-sources', default=None, - help='optional directory for lock files to source paths') - parser.add_argument('--mvtdir', default=None, - help='optional directory for Mapbox Vector Tiles (MVT)') - parser.add_argument('--mvt-compress', default=False, action='store_true', - help='whether to compress Mapbox Vector Tiles (MVT) files') - parser.add_argument('--rasterdir', default=None, - help='optional directory for raster files') - parser.add_argument('--metadata-compress', default=False, action='store_true', - help='whether to compress metadata.json files') - parser.add_argument('--force', default=False, action='store_true', - help='import even if no new changes were detected') - parser.add_argument('groupname', nargs='*', help='group layer name(s) to process') - args = parser.parse_args() - - if args.debug > 0: # pylint: disable=duplicate-code - logging.getLogger().setLevel(logging.DEBUG) - if args.debug > 1: - gdal.ConfigurePythonLogging(enable_debug=True) - - config = common.parse_config(groupnames=None if args.groupname == [] else args.groupname) - - # validate configuration - if 'dataset' not in config: - raise BadConfiguration('Configuration does not specify output dataset') - - layers = config.get('layers', {}) - validate_sources(layers) - - # set global GDAL/OGR configuration options - for pszKey, pszValue in config.get('GDALconfig', {}).items(): - logging.debug('gdal.SetConfigOption(%s, %s)', pszKey, pszValue) - gdal.SetConfigOption(pszKey, pszValue) - - # get configured Spatial Reference System and extent - srs = getSRS(config.get('SRS', None)) - extent = getExtent(config.get('extent', None), srs=srs) - - if args.lockfile is not None: - # obtain an exclusive lock and don't release it until exiting the program - lock_fd = os.open(args.lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644) - logging.debug('flock("%s", LOCK_EX)', args.lockfile) - flock(lock_fd, LOCK_EX) - - if args.mvtdir is not None: - args.mvtdir = Path(args.mvtdir) - if args.mvtdir == Path(): # make sure it's not curdir as we don't want to exchange it - raise RuntimeError('Invalid value for --mvtdir') - args.mvtdir.parent.mkdir(parents=True, exist_ok=True) - - if args.cachedir is not None: - args.cachedir = Path(args.cachedir) - if args.lockdir_sources is None: - sourcePathLocks = None - else: - sourcePathLocks = lockSourcePaths(layerdefs=layers.values(), - lockdir=args.lockdir_sources) - - # special handling for raster layers - if any(l.get('type') == 'raster' for l in layers.values()): - if not all(l.get('type') == 'raster' for l in layers.values()): - raise NotImplementedError('Mix of raster and vector layers is not supported') - if args.rasterdir is None: - raise RuntimeError('Missing required value for --rasterdir') - if len(layers) != 1: - raise RuntimeError('Raster layers need to be processed one at a time') - args.rasterdir = Path(args.rasterdir) - if args.rasterdir == Path(): # make sure it's not curdir as we don't want to exchange it - raise RuntimeError('Invalid value for --rasterdir') - args.rasterdir.parent.mkdir(parents=True, exist_ok=True) - last_modified = getLastMTimes(layerdefs=layers.values(), basedir=args.cachedir) - rv = 0 - for layername, layerdef in layers.items(): - try: - processRaster(layername, layerdef, - sources=parse_config_dl(config.get('downloads', [])), - license_info=config.get('license-info', {}), - last_modified=last_modified, - dst=args.rasterdir, - cachedir=args.cachedir, - extent=extent, - compress_metadata=args.metadata_compress) - except Exception: # pylint: disable=broad-exception-caught - rv = 1 - traceback.print_exc() - sys.exit(rv) - - # open output dataset (possibly create it first) - dso = openOutputDS(config['dataset']) - - validate_schema(layers, - drvo=dso.GetDriver(), - lco_defaults=config['dataset'].get('create-layer-options', None)) - - # create all output layers before starting the transaction - for layername, layerdef in layers.items(): - lyr = dso.GetLayerByName(layername) - if lyr is not None: - # TODO dso.DeleteLayer(layername) if --overwrite and - # dso.TestCapability(ogr.ODsCDeleteLayer) - # (Sets OVERWRITE=YES for PostgreSQL and GPKG.) - continue - if not dso.TestCapability(ogr.ODsCCreateLayer): - raise RuntimeError(f'Output driver {dso.GetDriver().ShortName} does not ' - 'support layer creation') - createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None)) - - if (dso.TestCapability(ogr.ODsCTransactions) and - # we need SAVEPOINT support - dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')): - logging.debug('Starting transaction') - dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE - else: - logging.warning('Output driver %s does not support dataset transactions or SQL SAVEPOINTs', - dso.GetDriver().ShortName) - dsoTransaction = False - - # validate layer cache - lyr_cache = config['dataset'].get('layercache', None) - if lyr_cache is None: - pass - elif validateCacheLayer(dso, lyr_cache): - lyr_cache = dso.GetLayerByName(lyr_cache) - else: - if not args.force: - logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache) - args.force = True - lyr_cache = None - - rv = 0 - try: - r = {} - n = 0 - start = time_monotonic() - for layername, layerdef in layers.items(): - r[layername] = r0 = processOutputLayer(dso, layername, layerdef, - srs=srs, - cachedir=args.cachedir, - extent=extent, - dsTransaction=dsoTransaction, - lyrcache=lyr_cache, - force=args.force) - n += 1 - logging.info('Import result status for layer "%s": %s', layername, str(r0)) - if r0 == ImportStatus.IMPORT_ERROR: - rv = 1 - if dsoTransaction: - dsoTransaction = False - logging.debug('Rolling back transaction') - # no need to catch the exception here - if dso.CommitTransaction() != ogr.OGRERR_NONE: - logging.error('Could not rollback transaction') - break - elapsed = time_monotonic() - start - logging.info('Processed %d destination layers in %s', n, common.format_time(elapsed)) - - # get mtimes before releasing the source locks - last_modified = getLastMTimes(layerdefs=layers.values(), basedir=args.cachedir) - - if sourcePathLocks is not None: - releaseSourcePathLocks(sourcePathLocks) - - export_layers = { l:d for l,d in layers.items() if d.get('publish', None) is not None } - if args.mvtdir is None or any(r0 == ImportStatus.IMPORT_ERROR for r0 in r.values()): - pass - elif len(export_layers) == 0: - logging.warning('--mvtdir option used but no layer has a publication definition') - elif (all(r0 == ImportStatus.IMPORT_NOCHANGE for l,r0 in r.items() if l in export_layers) - and args.mvtdir.is_dir()): - logging.info('Skipping MVT export for group %s (no changes)', - ', '.join(args.groupname) if args.groupname is not None else '*') - else: - exportMVT(dso, - layers=export_layers, - sources=parse_config_dl(config.get('downloads', [])), - license_info=config.get('license-info', {}), - last_modified=last_modified, - dst=args.mvtdir, - default_options=config.get('vector-tiles', None), - compress=args.mvt_compress, - compress_metadata=args.metadata_compress) - - if dsoTransaction: - dsoTransaction = False - logging.debug('Committing transaction') - if dso.CommitTransaction() != ogr.OGRERR_NONE: - logging.error('Could not commit transaction') - rv = 1 - - except Exception: # pylint: disable=broad-exception-caught - if dsoTransaction: - logging.exception('Exception occured within transaction, rolling back') - try: - if dso.RollbackTransaction() != ogr.OGRERR_NONE: - logging.error('Could not rollback transaction') - except Exception: # pylint: disable=broad-exception-caught - logging.exception('Could not rollback transaction') - else: - traceback.print_exc() - sys.exit(1) - - finally: - lyr_cache = None - dso = None - extent = None - srs = None - sys.exit(rv) - -gdal.UseExceptions() -main() -- cgit v1.2.3