diff options
Diffstat (limited to 'geodata-import-topo')
| -rwxr-xr-x | geodata-import-topo | 341 |
1 files changed, 341 insertions, 0 deletions
diff --git a/geodata-import-topo b/geodata-import-topo new file mode 100755 index 0000000..d814771 --- /dev/null +++ b/geodata-import-topo @@ -0,0 +1,341 @@ +#!/usr/bin/python3 + +#---------------------------------------------------------------------- +# Import Lantmäteriet's Topografi Vektor products. +# Copyright © 2026 Guilhem Moulin <guilhem@fripost.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +#---------------------------------------------------------------------- + +# pylint: disable=invalid-name, missing-module-docstring, fixme + +from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY +import os +from fcntl import flock, LOCK_EX, LOCK_SH +import logging +import argparse +import zipfile +import tempfile +from typing import Any, Final, NoReturn, Optional +from pathlib import Path +import sys +import traceback +from time import monotonic as time_monotonic + +from osgeo import gdal, ogr, osr +from osgeo.gdalconst import ( + CE_None as GDAL_CE_None, + OF_READONLY as GDAL_OF_READONLY, + OF_VECTOR as GDAL_OF_VECTOR, + OF_VERBOSE_ERROR as GDAL_OF_VERBOSE_ERROR, +) + +import common +from common import BadConfiguration, getSourcePathLockFileName +from common_gdal import getSRS, getExtent, gdalSetOpenExArgs +from import_source import ( + openOutputDS, + createOutputLayer, + validateOutputLayer, + importLayer, + clearLayer, + clusterLayer, +) + +def get_layer_schema(layer : ogr.Layer) -> dict[str, Any]: + """Return the layer definition dictionary for the given OGR layer.""" + schema = {} + defn = layer.GetLayerDefn() + n = defn.GetGeomFieldCount() + if n != 1: + raise NotImplementedError(f'Layer "{layer.GetName()}" has geometry field count {n} != 1') + geom_fld = defn.GetGeomFieldDefn(0) + if geom_fld.IsIgnored(): + raise NotImplementedError(f'Geometry field #0 in layer "{layer.GetName()}" is ignored') + schema['geometry-type'] = geom_fld.GetType() + + fields = schema['fields'] = [] + for i in range(defn.GetFieldCount()): + fld = defn.GetFieldDefn(i) + fld_name = fld.GetName() + if fld.IsIgnored(): + logging.warning('Field #%d "%s" in source layer "%s" is ignored', + i, fld_name, layer.GetName()) + continue + field = { 'Name': fld_name } + fields.append(field) + + if fld_name.lower() in ('objektidentitet', 'skapad', 'objekttypnr', 'objekttyp'): + field['Nullable'] = False + else: + field['Nullable'] = fld.IsNullable() + field['Unique'] = fld.IsUnique() + + field['AlternativeName'] = fld.GetAlternativeName() + field['Comment'] = fld.GetComment() + + fld_type = field['Type'] = fld.GetType() + if fld_type == ogr.OFTString and fld_name.lower() == 'objektidentitet': + field['SubType'] = ogr.OFSTUUID + else: + field['SubType'] = fld.GetSubType() + field['Width'] = fld.GetWidth() + field['Precision'] = fld.GetPrecision() + field['Justify'] = fld.GetJustify() + if fld_type in (ogr.OFTTime, ogr.OFTDate, ogr.OFTDateTime): + field['TZFlag'] = fld.GetTZFlag() + + # TODO fail if generated or has default + #print(fld.GetDefault(), fld.IsDefaultDriverSpecific()) + #print(fld.GetDomainName()) + #print(fld.IsGenerated()) + + return schema + + +def get_output_layer(layer_src : ogr.Layer, dso : gdal.Dataset, + layername_dst : str, + srs : Optional[osr.SpatialReference] = None, + create_layer_options : Optional[dict[str,Any]] = None) -> ogr.Layer: + """Get the destination layer to mirror the source to. It is + automatically created if needs be.""" + layer_dst = dso.GetLayerByName(layername_dst) + schema = get_layer_schema(layer_src) + if layer_dst is None: + schema['options'] = options = [] + if create_layer_options is not None: + options += [ k + '=' + str(v) for k, v in create_layer_options.items() ] + layer_dst = createOutputLayer(dso, layername_dst, srs=srs, options=schema) + + validateOutputLayer(layer_dst, srs=srs, options=schema) + + return layer_dst + + +LAYER_NAMES : Final[set[tuple[str|None,str]]] = set() +def import_layer(layer : ogr.Layer, dso : gdal.Dataset, filename : str, + srs : Optional[osr.SpatialReference] = None, + schema : Optional[str] = None, + create_layer_options : Optional[dict[str,Any]] = None) -> None: + """Import a give layer to PostGIS""" + layer_name = layer.GetName() + k = (schema, layer_name) + if k in LAYER_NAMES: + raise RuntimeError(f'Duplicate layer "{layer_name}" in schema {schema}') + LAYER_NAMES.add(k) + logging.info('Importing layer %s from %s to schema %s', layer_name, filename, schema) + + layer_dst = get_output_layer(layer, dso=dso, + layername_dst=layer_name if schema is None else + schema + '.' + layer_name, + srs=srs, + create_layer_options=create_layer_options) + + description = layer.GetMetadataItem('DESCRIPTION') + if (description is not None and + layer_dst.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): + logging.warning('Could not set description metadata') + + clearLayer(layer_dst, identity='RESTART IDENTITY') + defn_dst = layer_dst.GetLayerDefn() + field_map = { defn_dst.GetFieldDefn(i).GetName() : i for i in range(defn_dst.GetFieldCount()) } + importLayer(layer_dst, layer, args={'field-map': field_map}, extent=None) + + if layer_dst.GetLayerDefn().GetGeomType() != ogr.wkbNone: + clusterLayer(layer_dst, column_name=layer_dst.GetGeometryColumn()) + +def guess_schema_from_file(path : str) -> str: + """Infer PostgreSQL schema name from the filename of the .zip source.""" + stem = Path(path).stem + topomap = { 'topo1m': 'topo1000' } + for l in (1000, 250, 100, 50, 10): + l = 'topo' + str(l) + topomap[l] = l + try: + return 'lm_' + topomap[stem.lower()] + except KeyError as e: + raise RuntimeError(f'Could not guess schema name from input filename {path}') from e + +def import_source(path : str, dso : gdal.Dataset, + srs : Optional[osr.SpatialReference] = None, + schema : Optional[str] = None, + create_layer_options : Optional[dict[str,Any]] = None, + lockdir_fd: int|None = None) -> tuple[bool,int]: + """Import a single topo$FOO.zip source to PostGIS transactionally""" + if schema is None: + schema = guess_schema_from_file(path) + if lockdir_fd is None: + lock_fd = None + else: + lockfile = getSourcePathLockFileName(path) + lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664, + dir_fd=lockdir_fd) + dso_transaction = False + n = 0 + try: + if lock_fd is not None: + logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)', + path, lockfile) + flock(lock_fd, LOCK_SH) + if dso.TestCapability(ogr.ODsCTransactions): + logging.debug('Starting transaction') + dso_transaction = dso.StartTransaction() == ogr.OGRERR_NONE + n = import_source2(path, dso, srs=srs, schema=schema, + create_layer_options=create_layer_options) + if dso_transaction: + logging.debug('Committing transaction') + dso_transaction = False + if dso.CommitTransaction() != ogr.OGRERR_NONE: + logging.error('Could not commit transaction') + return True, n + except Exception: # pylint: disable=broad-exception-caught + traceback.print_exc() + if dso_transaction: + logging.debug('Rolling back transaction') + # no need to catch the exception here + if dso.CommitTransaction() != ogr.OGRERR_NONE: + logging.error('Could not rollback transaction') + finally: + if lock_fd is not None: + try: + os.close(lock_fd) + except (OSError, ValueError): + logging.exception('Could not close lockfile for %s', path) + return False, n + +def import_source2(path : str, dso : gdal.Dataset, + srs : Optional[osr.SpatialReference] = None, + schema : Optional[str] = None, + create_layer_options : Optional[dict[str,Any]] = None) -> int: + """Import a single shape file, or recursively all shape files + containing in a .zip file, to PostGIS.""" + n = 0 + if path.lower().endswith('.zip') and zipfile.is_zipfile(path): + logging.debug('Opening %s as ZipFile', path) + with zipfile.ZipFile(path, mode='r') as z: + for zi in z.infolist(): + if zi.is_dir(): + raise NotImplementedError(f'{zi.filename}: Zipped directories are ' + 'not supported') + if zi.filename == 'uttag.json': + continue + with tempfile.TemporaryDirectory() as tmpdir: + if logging.getLogger().getEffectiveLevel() == logging.DEBUG: + logging.debug('Extracting %s from %s into %s/', + zi.filename, path, tmpdir.removesuffix('/')) + elif not zi.filename.lower().endswith('.zip'): + logging.info('Extracting %s', zi.filename) + p = z.extract(zi, path=tmpdir) + n += import_source2(p, dso=dso, srs=srs, schema=schema, + create_layer_options=create_layer_options) + return n + + kwargs, _ = gdalSetOpenExArgs({}, flags=GDAL_OF_VECTOR|GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR) + logging.debug('OpenEx(%s, %s)', path, str(kwargs)) + ds = gdal.OpenEx(path, **kwargs) + if ds is None: + raise RuntimeError(f'Could not open {path}') + + n = ds.GetLayerCount() + logging.debug('Opened %s (driver %s, %d layers)', path, ds.GetDriver().ShortName, n) + filename = Path(path).name + for i in range(n): + import_layer(layer=ds.GetLayerByIndex(i), dso=dso, filename=filename, + srs=srs, schema=schema, + create_layer_options=create_layer_options) + return n + + +# pylint: disable-next=missing-function-docstring +def main() -> NoReturn: + common.init_logger(app=os.path.basename(__file__), level=logging.INFO) + + parser = argparse.ArgumentParser(description='Import Lantmäteriets Topografi products.') + parser.add_argument('--cachedir', default=None, + help=f'cache directory (default: {os.curdir})') + parser.add_argument('--debug', action='count', default=0, + help=argparse.SUPPRESS) + parser.add_argument('--schema', default=None, help='PostgreSQL schema name') + parser.add_argument('--lockfile', default=None, + help='obtain an exclusive lock before processing') + parser.add_argument('--lockdir-sources', default=None, + help='optional directory for lock files to source paths') + parser.add_argument('input', nargs='+', metavar='topo.zip', help='zipfile(s) to process') + args = parser.parse_args() + + if args.debug > 0: # pylint: disable=duplicate-code + logging.getLogger().setLevel(logging.DEBUG) + if args.debug > 1: + gdal.ConfigurePythonLogging(enable_debug=True) + + config = common.parse_config() + + # validate configuration + if 'dataset' not in config: + raise BadConfiguration('Configuration does not specify output dataset') + + # set global GDAL/OGR configuration options + # pylint: disable=duplicate-code + for pszKey, pszValue in config.get('GDALconfig', {}).items(): + logging.debug('gdal.SetConfigOption(%s, %s)', pszKey, pszValue) + gdal.SetConfigOption(pszKey, pszValue) + + # get configured Spatial Reference System and extent + srs = getSRS(config.get('SRS', None)) + _extent = getExtent(config.get('extent', None), srs=srs) + + if args.lockfile is not None: + # obtain an exclusive lock and don't release it until exiting the program + lock_fd = os.open(args.lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644) + logging.debug('flock("%s", LOCK_EX)', args.lockfile) + flock(lock_fd, LOCK_EX) + # pylint: enable=duplicate-code + + # open output dataset (possibly create it first) + dso = openOutputDS(config['dataset']) + + if args.cachedir is not None: + args.cachedir = Path(args.cachedir) + + if args.lockdir_sources is None: + umask = lockdir_fd = None + else: + umask = os.umask(0o002) + lockdir_fd = os.open(args.lockdir_sources, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY) + rv = 0 + try: + for path in args.input: + start = time_monotonic() + b, n = import_source(path if args.cachedir is None + else str(args.cachedir.joinpath(path)), + dso, srs=srs, lockdir_fd=lockdir_fd, + schema=args.schema, + create_layer_options=config['dataset'].get('create-layer-options', + None)) + if not b: + rv = 1 + logging.info('Imported %d layers from %s in %s', n, path, + common.format_time(time_monotonic() - start)) + finally: + if lockdir_fd is not None: + try: + os.close(lockdir_fd) + except (OSError, ValueError): + logging.exception('Could not close lockdir') + if umask is not None: + os.umask(umask) + sys.exit(rv) + +gdal.UseExceptions() +main() |
