aboutsummaryrefslogtreecommitdiffstats
path: root/webmap-import
diff options
context:
space:
mode:
Diffstat (limited to 'webmap-import')
-rwxr-xr-xwebmap-import794
1 files changed, 0 insertions, 794 deletions
diff --git a/webmap-import b/webmap-import
deleted file mode 100755
index 2f0f5b4..0000000
--- a/webmap-import
+++ /dev/null
@@ -1,794 +0,0 @@
-#!/usr/bin/python3
-
-#----------------------------------------------------------------------
-# Backend utilities for the Klimatanalys Norr project (extract/import layers)
-# Copyright © 2024-2025 Guilhem Moulin <info@guilhem.se>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <https://www.gnu.org/licenses/>.
-#----------------------------------------------------------------------
-
-# pylint: disable=invalid-name, missing-module-docstring, fixme
-
-from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY
-import os
-from stat import S_ISREG
-import sys
-from fcntl import flock, LOCK_EX, LOCK_SH
-import logging
-import argparse
-import re
-from datetime import datetime, timedelta, timezone, UTC
-from math import modf
-from pathlib import Path
-from time import monotonic as time_monotonic
-from typing import Any, Optional, NoReturn
-import traceback
-
-from osgeo import gdal, ogr, osr
-from osgeo.gdalconst import (
- CE_None as GDAL_CE_None,
- DCAP_DEFAULT_FIELDS as GDAL_DCAP_DEFAULT_FIELDS,
- DCAP_NOTNULL_FIELDS as GDAL_DCAP_NOTNULL_FIELDS,
- DCAP_UNIQUE_FIELDS as GDAL_DCAP_UNIQUE_FIELDS,
-)
-from osgeo import gdalconst
-
-import common
-from common import (
- BadConfiguration,
- parse_config_dl,
- escape_identifier,
- escape_literal_str,
- getSourcePathLockFileName
-)
-from common_gdal import (
- gdalGetMetadataItem,
- getSRS,
- getExtent,
- parseGeomType,
- parseFieldType,
- parseSubFieldType,
- parseTimeZone
-)
-from import_source import (
- openOutputDS,
- createOutputLayer,
- validateOutputLayer,
- importSources,
- ImportStatus
-)
-from export_mvt import exportMVT
-from export_raster import processRaster
-
-def setFieldIf(cond : bool,
- attrName : str,
- val : Any,
- data : dict[str, Any],
- fldName : str,
- drvName : str,
- log = logging.warning) -> None:
- """Conditionally set a field"""
- if cond:
- data[attrName] = val
- else:
- if isinstance(val, str):
- val2 = '"' + val + '"'
- else:
- val2 = str(val)
- log('Ignoring %s=%s on field "%s" (not supported by %s driver)',
- attrName, val2, fldName, drvName)
-
-# pylint: disable-next=too-many-branches
-def validate_schema(layers : dict[str, Any],
- drvo : Optional[gdal.Driver] = None,
- lco_defaults : Optional[dict[str, str]] = None) -> None:
- """Validate layer creation options and schema. The schema is
- modified in place with the parsed result.
- (We need the driver of the output dataset to determine capability on
- constraints.)"""
-
- # list of capability flags supported by the CreateField() API
- drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_CREATION_FIELD_DEFN_FLAGS)
- drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else []
- drvoSupportsFieldComment = 'Comment' in drvoFieldDefnFlags
-
- # cache driver capabilities
- drvoSupportsFieldWidthPrecision = 'WidthPrecision' in drvoFieldDefnFlags
- drvoSupportsFieldNullable = ('Nullable' in drvoFieldDefnFlags and
- gdalGetMetadataItem(drvo, GDAL_DCAP_NOTNULL_FIELDS))
- drvoSupportsFieldUnique = ('Unique' in drvoFieldDefnFlags and
- gdalGetMetadataItem(drvo, GDAL_DCAP_UNIQUE_FIELDS))
- drvoSupportsFieldDefault = ('Default' in drvoFieldDefnFlags and
- gdalGetMetadataItem(drvo, GDAL_DCAP_DEFAULT_FIELDS))
- drvoSupportsFieldAlternativeName = 'AlternativeName' in drvoFieldDefnFlags
-
- for layername, layerdef in layers.items():
- create = layerdef.get('create', None)
- if create is None or len(create) < 1:
- logging.warning('Layer "%s" has no creation schema', layername)
- continue
-
- # prepend global layer creation options (dataset:create-layer-options)
- # and build the option=value list
- lco = create.get('options', None)
- if lco_defaults is not None or lco is not None:
- options = []
- if lco_defaults is not None:
- options += [ k + '=' + str(v) for k, v in lco_defaults.items() ]
- if lco is not None:
- options += [ k + '=' + str(v) for k, v in lco.items() ]
- create['options'] = options
-
- # parse geometry type
- create['geometry-type'] = parseGeomType(create.get('geometry-type', None))
-
- fields = create.get('fields', None)
- if fields is None:
- create['fields'] = []
- else:
- fields_set = set()
- for idx, fld_def in enumerate(fields):
- fld_name = fld_def.get('name', None)
- if fld_name is None or fld_name == '':
- raise BadConfiguration(f'Field #{idx} has no name')
- if fld_name in fields_set:
- raise BadConfiguration(f'Duplicate field "{fld_name}"')
- fields_set.add(fld_name)
-
- fld_def2 = { 'Name': fld_name }
- for k, v in fld_def.items():
- k2 = k.lower()
- if k2 == 'name':
- pass
- elif k2 in ('alternativename', 'alias'):
- setFieldIf(drvoSupportsFieldAlternativeName,
- 'AlternativeName', v, fld_def2, fld_name, drvo.ShortName,
- log=logging.debug)
- elif k2 == 'comment':
- setFieldIf(drvoSupportsFieldComment,
- 'Comment', v, fld_def2, fld_name, drvo.ShortName,
- log=logging.debug)
-
- elif k2 == 'type':
- fld_def2['Type'] = parseFieldType(v)
- elif k2 == 'subtype':
- fld_def2['SubType'] = parseSubFieldType(v)
- elif k2 == 'tz':
- fld_def2['TZFlag'] = parseTimeZone(v)
- elif k2 == 'width' and v is not None and isinstance(v, int):
- setFieldIf(drvoSupportsFieldWidthPrecision,
- 'Width', v, fld_def2, fld_name, drvo.ShortName)
- elif k2 == 'precision' and v is not None and isinstance(v, int):
- setFieldIf(drvoSupportsFieldWidthPrecision,
- 'Precision', v, fld_def2, fld_name, drvo.ShortName)
-
- # constraints
- elif k2 == 'default':
- setFieldIf(drvoSupportsFieldDefault,
- 'Default', v, fld_def2, fld_name, drvo.ShortName)
- elif k2 == 'nullable' and v is not None and isinstance(v, bool):
- setFieldIf(drvoSupportsFieldNullable,
- 'Nullable', v, fld_def2, fld_name, drvo.ShortName)
- elif k2 == 'unique' and v is not None and isinstance(v, bool):
- setFieldIf(drvoSupportsFieldUnique,
- 'Unique', v, fld_def2, fld_name, drvo.ShortName)
- else:
- raise BadConfiguration(f'Field "{fld_name}" has unknown key "{k}"')
-
- fields[idx] = fld_def2
-
-def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]):
- """Setup output field mapping, modifying the sources dictionary in place."""
- fieldMap = {}
- n = defn.GetFieldCount()
- for i in range(n):
- fld = defn.GetFieldDefn(i)
- fldName = fld.GetName()
- fieldMap[fldName] = i
-
- for source in sources:
- source_import = source['import']
-
- fieldMap2 = source_import.get('field-map', None)
- if fieldMap2 is None:
- fieldMap2 = fieldMap
- else:
- if isinstance(fieldMap2, list):
- # convert list to identity dictionary
- fieldMap2 = { fld: fld for fld in fieldMap2 }
-
- for ifld, ofld in fieldMap2.items():
- i = fieldMap.get(ofld, None)
- if i is None:
- raise RuntimeError(f'Ouput layer has no field named "{ofld}"')
- fieldMap2[ifld] = i
- source_import['field-map'] = fieldMap2
-
- # validate field value mapping
- valueMap = source_import.get('value-map', None)
- if valueMap is not None:
- for fldName, rules in valueMap.items():
- if rules is None:
- continue
- if not isinstance(rules, list):
- rules = [rules]
- for idx, rule in enumerate(rules):
- if rule is None or not isinstance(rule, dict):
- raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}')
- if 'type' not in rule:
- ruleType = rule['type'] = 'literal'
- else:
- ruleType = rule['type']
- if ('replace' not in rule or 'with' not in rule or len(rule) != 3 or
- ruleType is None or ruleType not in ('literal', 'regex')):
- raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}')
- if ruleType == 'regex':
- rule['replace'] = re.compile(rule['replace'])
- rules[idx] = ( rule['replace'], rule['with'] )
-
-def processOutputLayer(ds : gdal.Dataset,
- layername : str,
- layerdef : dict[str,Any],
- srs : Optional[osr.SpatialReference] = None,
- cachedir : Path|None = None,
- extent : ogr.Geometry|None = None,
- dsTransaction : bool = False,
- lyrcache : ogr.Layer|None = None,
- force : bool = False) -> ImportStatus:
- """Process an output layer."""
-
- logging.info('Processing output layer "%s"', layername)
- lyr = ds.GetLayerByName(layername)
- if lyr is None:
- raise RuntimeError(f'Failed to create output layer "{layername}"??')
- if not lyr.TestCapability(ogr.OLCSequentialWrite):
- raise RuntimeError(f'Output layer "{layername}" has no working '
- 'CreateFeature() method')
-
- sources = layerdef['sources']
- if not (lyrcache is None or force or
- areSourceFilesNewer(layername, sources=sources,
- lyrcache=lyrcache,
- cachedir=cachedir)):
- logging.info('Output layer "%s" is up to date, skipping', layername)
- return ImportStatus.IMPORT_NOCHANGE
-
- validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
-
- description = layerdef.get('description', None)
- if (description is not None and
- lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
- logging.warning('Could not set description metadata')
-
- # setup output field mapping in the sources dictionary
- setOutputFieldMap(lyr.GetLayerDefn(), sources)
-
- return importSources(lyr=lyr, sources=sources,
- cachedir=cachedir, extent=extent,
- dsoTransaction=dsTransaction,
- lyrcache=lyrcache,
- force=force,
- cluster_geometry=layerdef.get('cluster-geometry', False))
-
-def validate_sources(layers : dict[str, Any]) -> None:
- """Mangle and validate layer sources and import definitions"""
- toremove = set()
- for layername, layerdefs in layers.items():
- sources = layerdefs.get('sources', None)
- if sources is None or len(sources) < 1:
- logging.warning('Output layer "%s" has no definition, skipping', layername)
- toremove.add(layername)
- continue
-
- for idx, layerdef in enumerate(sources):
- importdef = layerdef.get('import', None)
- if importdef is None:
- raise BadConfiguration(f'Source #{idx} of output layer "{layername}" '
- 'has no import definition')
-
- sourcedef = layerdef.get('source', None)
- unar = None if sourcedef is None else sourcedef.get('unar', None)
- src = None if sourcedef is None else sourcedef.get('path', None)
-
- ds_srcpath = importdef.get('path', None)
- if src is None and unar is None and ds_srcpath is not None:
- # fallback to importe:path if there is no unarchiving recipe
- src = ds_srcpath
- if unar is not None and ds_srcpath is None:
- raise BadConfiguration(f'Source #{idx} of output layer "{layername}" '
- 'has no import source path')
- if src is None:
- raise BadConfiguration(f'Source #{idx} of output layer "{layername}" '
- 'has no source path')
- layerdef['source'] = { 'path': src, 'unar': unar }
-
- for layername in toremove:
- layers.pop(layername)
-
-def validateLayerCacheField(defn : ogr.FeatureDefn, idx : int,
- name : str,
- typ : int,
- subtyp : int = ogr.OFSTNone,
- width : int = 0,
- unique : Optional[bool] = None,
- nullable : Optional[bool] = None) -> bool:
- """Validate field #idx from the layer cache table."""
- n = defn.GetFieldCount()
- if idx >= n:
- return False
- defn = defn.GetFieldDefn(idx)
-
- b = True
- name2 = defn.GetName()
- if name2 != name:
- logging.warning('Layer cache\'s field #%d has name "%s" != "%s"', idx, name2, name)
- b = False
-
- if nullable is not None and defn.IsNullable() != nullable:
- # non-fatal
- logging.warning('Layer cache\'s field #%d ("%s") %s nullable',
- idx, name2, 'is' if defn.IsNullable() else 'isn\'t')
-
- if unique is not None and defn.IsUnique() != unique:
- # non-fatal
- logging.warning('Layer cache\'s field #%d ("%s") %s unique',
- idx, name2, 'is' if defn.IsUnique() else 'isn\'t')
-
- typ2 = defn.GetType()
- if typ2 != typ:
- logging.warning('Layer cache\'s field #%d ("%s") has type %s != %s', idx, name2,
- ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ))
- b = False
-
- subtyp2 = defn.GetSubType()
- if subtyp2 != subtyp:
- logging.warning('Layer cache\'s field #%d ("%s") has subtype %s != %s', idx, name2,
- ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp))
- b = False
-
- width2 = defn.GetWidth()
- if width2 != 0 and (width == 0 or width2 < width):
- # non-fatal
- logging.warning('Layer cache\'s field #%d ("%s") is too small (width %d < %d)',
- idx, name2, width2, width)
- return b
-
-def validateCacheLayer(ds : gdal.Dataset, name : str) -> bool:
- """Validate layer cache table."""
- drvName = ds.GetDriver().ShortName
- if drvName != 'PostgreSQL': # we need hash_record_extended(), sha256() and ST_AsEWKB()
- logging.warning('Unsupported cache layer for output driver %s', drvName)
- return False
- lyr = ds.GetLayerByName(name)
- if lyr is None:
- logging.warning('Table "%s" does not exist', name)
- return False
-
- if not (lyr.TestCapability(ogr.OLCRandomWrite) and lyr.TestCapability(ogr.OLCUpdateFeature)):
- logging.warning('Layer "%s" does not support OLCUpdateFeature capability, '
- 'ignoring cache', name)
- return False
-
- defn = lyr.GetLayerDefn()
- fields = [
- { 'name': 'layername', 'typ': ogr.OFTString,
- 'nullable': False, 'unique': True, 'width': 255 },
- { 'name': 'last_updated', 'typ': ogr.OFTDateTime,
- 'nullable': False },
- { 'name': 'fingerprint', 'typ': ogr.OFTBinary,
- 'nullable': False, 'width': 32 },
- ]
- m = len(fields)
- n = defn.GetFieldCount()
- if n < m:
- # this is fatal, and `all(bs)` is False so we return False below
- logging.warning('Layer cache "%s" has %d < %d fields', name, n, m)
- elif n != m:
- logging.warning('Layer cache "%s" has %d != %d fields', name, n, m)
- bs = [ validateLayerCacheField(defn, i, **fld) for i,fld in enumerate(fields) ]
- if not all(bs):
- return False
-
- n = defn.GetGeomFieldCount()
- if n > 0:
- geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName())
- for i in range(n) ]
- logging.warning('Layer cache "%s" has %d > 0 geometry field(s): %s',
- name, n, ', '.join(geomFieldNames))
-
- style = lyr.GetStyleTable()
- if style is not None:
- logging.warning('Layer cache "%s" has a style table "%s"',
- name, style.GetLastStyleName())
- return True
-
-def areSourceFilesNewer(layername : str,
- sources : dict[str,Any],
- lyrcache : ogr.Layer,
- cachedir : Optional[Path] = None) -> bool:
- """Return a boolean indicating whether the layer cache is up to date with
- respect to the source files found on disk. That is, the last modification
- and last changed time of each source file needs to be equal or lower than
- the `last_updated` value found in the layer cache."""
-
- source_paths = set()
- for source in sources:
- # the same source_path can be used multiple times, stat(2) only once
- source_path = source['source']['path']
- source_paths.add(source_path)
- if len(source_paths) == 0:
- return False
-
- t = None
- mtimes_ns = {}
- for source_path in source_paths:
- path = source_path if cachedir is None else str(cachedir.joinpath(source_path))
- try:
- st = os.stat(path)
- if not S_ISREG(st.st_mode):
- raise FileNotFoundError
- mtimes_ns[source_path] = st.st_mtime_ns
- # take max(mtime, ctime): if we lock source paths any update after
- # aquiring the lock will yield a value larger than time.time_ns()
- t2 = max(st.st_mtime_ns, st.st_ctime_ns)
- if t is None or t < t2:
- t = t2
- except (OSError, ValueError):
- #logging.warning('Could not stat(%s)', path)
- return True
- assert t is not None
-
- attributeFilter = 'layername = ' + escape_literal_str(layername)
- logging.debug('SetAttributeFilter("%s", "%s")', lyrcache.GetName(), attributeFilter)
- lyrcache.SetAttributeFilter(attributeFilter)
-
- feature = lyrcache.GetNextFeature()
- if feature is None:
- # not in cache
- return True
-
- if not feature.IsFieldSetAndNotNull(1):
- ret = True
- else:
- # https://gdal.org/en/stable/api/python/vector_api.html#osgeo.ogr.Feature.GetFieldAsDateTime
- # [ year, month, day, hour, minute, second, timezone flag ]
- dt = feature.GetFieldAsDateTime(1)
- if dt[6] == ogr.TZFLAG_UNKNOWN:
- logging.warning('Datetime specified with unknown timezone in layer cache\'s '
- 'field #%d "%s", assuming local time', 1,
- feature.GetDefnRef().GetFieldDefn(1).GetName())
- tz = None
- elif dt[6] == ogr.TZFLAG_LOCALTIME:
- tz = None
- elif dt[6] == ogr.TZFLAG_UTC:
- tz = UTC
- else:
- tz = timezone(offset=timedelta(seconds=(dt[6] - ogr.TZFLAG_UTC) * 900))
- ms, s = modf(dt[5])
- dt = datetime(
- year=dt[0], # including century
- month=dt[1], # 01 ≤ year ≤ 12
- day=dt[2], # 01 ≤ day ≤ 31
- hour=dt[3], # 00 ≤ hour ≤ 23
- minute=dt[4], # 00 ≤ minute ≤ 59
- second=int(s), # 00 ≤ second ≤ 59
- microsecond=round(ms*1000000),
- tzinfo=tz
- )
- fpr = feature.GetFieldAsBinary(2) if feature.IsFieldSetAndNotNull(2) else None
- logging.debug('Found entry in layer cache for "%s", last_updated=%s, fingerprint=%s',
- layername,
- dt.isoformat(timespec='microseconds'),
- fpr.hex() if fpr is not None else 'NULL')
- ret = int(dt.timestamp() * 1000000.) * 1000 < t
-
- if lyrcache.GetNextFeature() is not None:
- raise RuntimeError(f'Duplicate key {layername}')
-
- if not ret:
- for source_path, mtime_ns in sorted(mtimes_ns.items()):
- # XXX datetime.fromtimestamp() doesn't support nanosecond input
- # https://github.com/python/cpython/issues/59648
- mtime = (mtime_ns // 1000) / 1000000.
- dt = datetime.fromtimestamp(mtime)
- logging.info('Source file %s is unchanged (last modification time %s)',
- source_path, dt.astimezone().isoformat(timespec='seconds'))
- return ret
-
-def getLastMTimes(layerdefs : dict[str,Any], basedir : Optional[Path] = None) -> dict[str,int]:
- """Return a directing mapping source paths to their last modification time
- (as a timestamp in milliseconds)."""
- ret = {}
- for layerdef in layerdefs:
- for source in layerdef['sources']:
- source_path = source['source']['path']
- if source_path in ret:
- continue
- path = source_path if basedir is None else str(basedir.joinpath(source_path))
- try:
- st = os.stat(path)
- if not S_ISREG(st.st_mode):
- raise FileNotFoundError
- ret[source_path] = st.st_mtime_ns // 1000000
- except (OSError, ValueError):
- #logging.warning('Could not stat(%s)', path)
- pass
- return ret
-
-def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]:
- """Place shared locks on each source path and return their respective file
- descriptors. We could do that one layerdef at a time (one output layer at a
- time) to reduce the time during which the sources prevented from being
- updated/downloaded, but their is some value in having consistency across the
- whole import process."""
- umask = os.umask(0o002)
- lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY)
- try:
- ret = {}
- for layerdef in layerdefs:
- for source in layerdef['sources']:
- source_path = source['source']['path']
- if source_path in ret:
- continue
- lockfile = getSourcePathLockFileName(source_path)
- lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664,
- dir_fd=lockdir_fd)
- logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)',
- source_path, lockfile)
- flock(lock_fd, LOCK_SH)
- ret[source_path] = lock_fd
- return ret
- finally:
- try:
- os.close(lockdir_fd)
- except (OSError, ValueError):
- logging.exception('Could not close lockdir')
- os.umask(umask)
-
-def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None:
- """Release shared locks on the source paths. Closed FDs are removed from
- the dictionary in place."""
- toremove = set()
- for path, lock_fd in lock_fds.items():
- try:
- os.close(lock_fd)
- except (OSError, ValueError):
- logging.exception('Could not release lock for %s', path)
- else:
- logging.debug('Released lock for %s', path)
- toremove.add(path)
- for path in toremove:
- lock_fds.pop(path)
-
-# pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements
-def main() -> NoReturn:
- common.init_logger(app=os.path.basename(__file__), level=logging.INFO)
-
- parser = argparse.ArgumentParser(description='Extract and import GIS layers.')
- parser.add_argument('--cachedir', default=None,
- help=f'cache directory (default: {os.curdir})')
- parser.add_argument('--debug', action='count', default=0,
- help=argparse.SUPPRESS)
- parser.add_argument('--lockfile', default=None,
- help='obtain an exclusive lock before processing')
- parser.add_argument('--lockdir-sources', default=None,
- help='optional directory for lock files to source paths')
- parser.add_argument('--mvtdir', default=None,
- help='optional directory for Mapbox Vector Tiles (MVT)')
- parser.add_argument('--mvt-compress', default=False, action='store_true',
- help='whether to compress Mapbox Vector Tiles (MVT) files')
- parser.add_argument('--rasterdir', default=None,
- help='optional directory for raster files')
- parser.add_argument('--metadata-compress', default=False, action='store_true',
- help='whether to compress metadata.json files')
- parser.add_argument('--force', default=False, action='store_true',
- help='import even if no new changes were detected')
- parser.add_argument('groupname', nargs='*', help='group layer name(s) to process')
- args = parser.parse_args()
-
- if args.debug > 0: # pylint: disable=duplicate-code
- logging.getLogger().setLevel(logging.DEBUG)
- if args.debug > 1:
- gdal.ConfigurePythonLogging(enable_debug=True)
-
- config = common.parse_config(groupnames=None if args.groupname == [] else args.groupname)
-
- # validate configuration
- if 'dataset' not in config:
- raise BadConfiguration('Configuration does not specify output dataset')
-
- layers = config.get('layers', {})
- validate_sources(layers)
-
- # set global GDAL/OGR configuration options
- for pszKey, pszValue in config.get('GDALconfig', {}).items():
- logging.debug('gdal.SetConfigOption(%s, %s)', pszKey, pszValue)
- gdal.SetConfigOption(pszKey, pszValue)
-
- # get configured Spatial Reference System and extent
- srs = getSRS(config.get('SRS', None))
- extent = getExtent(config.get('extent', None), srs=srs)
-
- if args.lockfile is not None:
- # obtain an exclusive lock and don't release it until exiting the program
- lock_fd = os.open(args.lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644)
- logging.debug('flock("%s", LOCK_EX)', args.lockfile)
- flock(lock_fd, LOCK_EX)
-
- if args.mvtdir is not None:
- args.mvtdir = Path(args.mvtdir)
- if args.mvtdir == Path(): # make sure it's not curdir as we don't want to exchange it
- raise RuntimeError('Invalid value for --mvtdir')
- args.mvtdir.parent.mkdir(parents=True, exist_ok=True)
-
- if args.cachedir is not None:
- args.cachedir = Path(args.cachedir)
- if args.lockdir_sources is None:
- sourcePathLocks = None
- else:
- sourcePathLocks = lockSourcePaths(layerdefs=layers.values(),
- lockdir=args.lockdir_sources)
-
- # special handling for raster layers
- if any(l.get('type') == 'raster' for l in layers.values()):
- if not all(l.get('type') == 'raster' for l in layers.values()):
- raise NotImplementedError('Mix of raster and vector layers is not supported')
- if args.rasterdir is None:
- raise RuntimeError('Missing required value for --rasterdir')
- if len(layers) != 1:
- raise RuntimeError('Raster layers need to be processed one at a time')
- args.rasterdir = Path(args.rasterdir)
- if args.rasterdir == Path(): # make sure it's not curdir as we don't want to exchange it
- raise RuntimeError('Invalid value for --rasterdir')
- args.rasterdir.parent.mkdir(parents=True, exist_ok=True)
- last_modified = getLastMTimes(layerdefs=layers.values(), basedir=args.cachedir)
- rv = 0
- for layername, layerdef in layers.items():
- try:
- processRaster(layername, layerdef,
- sources=parse_config_dl(config.get('downloads', [])),
- license_info=config.get('license-info', {}),
- last_modified=last_modified,
- dst=args.rasterdir,
- cachedir=args.cachedir,
- extent=extent,
- compress_metadata=args.metadata_compress)
- except Exception: # pylint: disable=broad-exception-caught
- rv = 1
- traceback.print_exc()
- sys.exit(rv)
-
- # open output dataset (possibly create it first)
- dso = openOutputDS(config['dataset'])
-
- validate_schema(layers,
- drvo=dso.GetDriver(),
- lco_defaults=config['dataset'].get('create-layer-options', None))
-
- # create all output layers before starting the transaction
- for layername, layerdef in layers.items():
- lyr = dso.GetLayerByName(layername)
- if lyr is not None:
- # TODO dso.DeleteLayer(layername) if --overwrite and
- # dso.TestCapability(ogr.ODsCDeleteLayer)
- # (Sets OVERWRITE=YES for PostgreSQL and GPKG.)
- continue
- if not dso.TestCapability(ogr.ODsCCreateLayer):
- raise RuntimeError(f'Output driver {dso.GetDriver().ShortName} does not '
- 'support layer creation')
- createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None))
-
- if (dso.TestCapability(ogr.ODsCTransactions) and
- # we need SAVEPOINT support
- dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')):
- logging.debug('Starting transaction')
- dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE
- else:
- logging.warning('Output driver %s does not support dataset transactions or SQL SAVEPOINTs',
- dso.GetDriver().ShortName)
- dsoTransaction = False
-
- # validate layer cache
- lyr_cache = config['dataset'].get('layercache', None)
- if lyr_cache is None:
- pass
- elif validateCacheLayer(dso, lyr_cache):
- lyr_cache = dso.GetLayerByName(lyr_cache)
- else:
- if not args.force:
- logging.warning('Ignoring invalid layer cache "%s" (implying --force)', lyr_cache)
- args.force = True
- lyr_cache = None
-
- rv = 0
- try:
- r = {}
- n = 0
- start = time_monotonic()
- for layername, layerdef in layers.items():
- r[layername] = r0 = processOutputLayer(dso, layername, layerdef,
- srs=srs,
- cachedir=args.cachedir,
- extent=extent,
- dsTransaction=dsoTransaction,
- lyrcache=lyr_cache,
- force=args.force)
- n += 1
- logging.info('Import result status for layer "%s": %s', layername, str(r0))
- if r0 == ImportStatus.IMPORT_ERROR:
- rv = 1
- if dsoTransaction:
- dsoTransaction = False
- logging.debug('Rolling back transaction')
- # no need to catch the exception here
- if dso.CommitTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not rollback transaction')
- break
- elapsed = time_monotonic() - start
- logging.info('Processed %d destination layers in %s', n, common.format_time(elapsed))
-
- # get mtimes before releasing the source locks
- last_modified = getLastMTimes(layerdefs=layers.values(), basedir=args.cachedir)
-
- if sourcePathLocks is not None:
- releaseSourcePathLocks(sourcePathLocks)
-
- export_layers = { l:d for l,d in layers.items() if d.get('publish', None) is not None }
- if args.mvtdir is None or any(r0 == ImportStatus.IMPORT_ERROR for r0 in r.values()):
- pass
- elif len(export_layers) == 0:
- logging.warning('--mvtdir option used but no layer has a publication definition')
- elif (all(r0 == ImportStatus.IMPORT_NOCHANGE for l,r0 in r.items() if l in export_layers)
- and args.mvtdir.is_dir()):
- logging.info('Skipping MVT export for group %s (no changes)',
- ', '.join(args.groupname) if args.groupname is not None else '*')
- else:
- exportMVT(dso,
- layers=export_layers,
- sources=parse_config_dl(config.get('downloads', [])),
- license_info=config.get('license-info', {}),
- last_modified=last_modified,
- dst=args.mvtdir,
- default_options=config.get('vector-tiles', None),
- compress=args.mvt_compress,
- compress_metadata=args.metadata_compress)
-
- if dsoTransaction:
- dsoTransaction = False
- logging.debug('Committing transaction')
- if dso.CommitTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not commit transaction')
- rv = 1
-
- except Exception: # pylint: disable=broad-exception-caught
- if dsoTransaction:
- logging.exception('Exception occured within transaction, rolling back')
- try:
- if dso.RollbackTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not rollback transaction')
- except Exception: # pylint: disable=broad-exception-caught
- logging.exception('Could not rollback transaction')
- else:
- traceback.print_exc()
- sys.exit(1)
-
- finally:
- lyr_cache = None
- dso = None
- extent = None
- srs = None
- sys.exit(rv)
-
-gdal.UseExceptions()
-main()