#!/usr/bin/python3 #---------------------------------------------------------------------- # Import Lantmäteriet's Topografi Vektor products. # Copyright © 2026 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- # pylint: disable=invalid-name, missing-module-docstring, fixme from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY import os from fcntl import flock, LOCK_EX, LOCK_SH import logging import argparse import zipfile import tempfile from typing import Any, Final, NoReturn, Optional from pathlib import Path import sys import traceback from time import monotonic as time_monotonic from osgeo import gdal, ogr, osr from osgeo.gdalconst import ( CE_None as GDAL_CE_None, OF_READONLY as GDAL_OF_READONLY, OF_VECTOR as GDAL_OF_VECTOR, OF_VERBOSE_ERROR as GDAL_OF_VERBOSE_ERROR, ) import common from common import BadConfiguration, getSourcePathLockFileName from common_gdal import getSRS, getExtent, gdalSetOpenExArgs from import_source import ( openOutputDS, createOutputLayer, validateOutputLayer, importLayer, clearLayer, clusterLayer, ) def get_layer_schema(layer : ogr.Layer) -> dict[str, Any]: """Return the layer definition dictionary for the given OGR layer.""" schema = {} layer_name = layer.GetName() defn = layer.GetLayerDefn() n = defn.GetGeomFieldCount() if n != 1: raise NotImplementedError(f'Layer "{layer_name}" has geometry field count {n} != 1') geom_fld = defn.GetGeomFieldDefn(0) if geom_fld.IsIgnored(): raise NotImplementedError(f'Geometry field #0 in layer "{layer_name}" is ignored') schema['geometry-type'] = geom_fld.GetType() layer_name_lower = layer_name.lower() fields = schema['fields'] = [] for i in range(defn.GetFieldCount()): fld = defn.GetFieldDefn(i) fld_name = fld.GetName() fld_name_lower = fld_name.lower() if fld.IsIgnored(): logging.warning('Field #%d "%s" in source layer "%s" is ignored', i, fld_name, layer_name) continue field = { 'Name': fld_name } fields.append(field) if fld_name_lower in ('objektidentitet', 'skapad', 'objekttypnr', 'objekttyp'): field['Nullable'] = False else: field['Nullable'] = fld.IsNullable() field['Unique'] = fld.IsUnique() field['AlternativeName'] = fld.GetAlternativeName() field['Comment'] = fld.GetComment() fld_type = field['Type'] = fld.GetType() if (layer_name_lower, fld_name_lower) in (('lansyta', 'lanskod'), ('kommunyta', 'kommunkod')): fld_type = field['Type'] = ogr.OFTInteger field['SubType'] = ogr.OFSTInt16 field['Nullable'] = False field['Unique'] = True elif fld_type == ogr.OFTString and fld_name_lower == 'objektidentitet': field['SubType'] = ogr.OFSTUUID else: field['SubType'] = fld.GetSubType() field['Width'] = fld.GetWidth() field['Precision'] = fld.GetPrecision() field['Justify'] = fld.GetJustify() if fld_type in (ogr.OFTTime, ogr.OFTDate, ogr.OFTDateTime): field['TZFlag'] = fld.GetTZFlag() # TODO fail if generated or has default #print(fld.GetDefault(), fld.IsDefaultDriverSpecific()) #print(fld.GetDomainName()) #print(fld.IsGenerated()) return schema def get_output_layer(layer_src : ogr.Layer, dso : gdal.Dataset, layername_dst : str, srs : Optional[osr.SpatialReference] = None, create_layer_options : Optional[dict[str,Any]] = None) -> ogr.Layer: """Get the destination layer to mirror the source to. It is automatically created if needs be.""" layer_dst = dso.GetLayerByName(layername_dst) schema = get_layer_schema(layer_src) if layer_dst is None: schema['options'] = options = [] if create_layer_options is not None: options += [ k + '=' + str(v) for k, v in create_layer_options.items() ] layer_dst = createOutputLayer(dso, layername_dst, srs=srs, options=schema) validateOutputLayer(layer_dst, srs=srs, options=schema) return layer_dst LAYER_NAMES : Final[set[tuple[str|None,str]]] = set() def import_layer(layer : ogr.Layer, dso : gdal.Dataset, filename : str, srs : Optional[osr.SpatialReference] = None, schema : Optional[str] = None, create_layer_options : Optional[dict[str,Any]] = None) -> None: """Import a give layer to PostGIS""" layer_name = layer.GetName() k = (schema, layer_name) if k in LAYER_NAMES: raise RuntimeError(f'Duplicate layer "{layer_name}" in schema {schema}') LAYER_NAMES.add(k) logging.info('Importing layer %s from %s to schema %s', layer_name, filename, schema) layer_dst = get_output_layer(layer, dso=dso, layername_dst=layer_name if schema is None else schema + '.' + layer_name, srs=srs, create_layer_options=create_layer_options) description = layer.GetMetadataItem('DESCRIPTION') if (description is not None and layer_dst.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): logging.warning('Could not set description metadata') clearLayer(layer_dst, identity='RESTART IDENTITY') defn_dst = layer_dst.GetLayerDefn() field_map = { defn_dst.GetFieldDefn(i).GetName() : i for i in range(defn_dst.GetFieldCount()) } importLayer(layer_dst, layer, args={'field-map': field_map}, extent=None) if layer_dst.GetLayerDefn().GetGeomType() != ogr.wkbNone: clusterLayer(layer_dst, column_name=layer_dst.GetGeometryColumn()) def guess_schema_from_file(path : str) -> str: """Infer PostgreSQL schema name from the filename of the .zip source.""" stem = Path(path).stem topomap = { 'topo1m': 'topo1000' } for l in (1000, 250, 100, 50, 10): l = 'topo' + str(l) topomap[l] = l try: return 'lm_' + topomap[stem.lower()] except KeyError as e: raise RuntimeError(f'Could not guess schema name from input filename {path}') from e def import_source(path : str, dso : gdal.Dataset, srs : Optional[osr.SpatialReference] = None, schema : Optional[str] = None, create_layer_options : Optional[dict[str,Any]] = None, lockdir_fd: int|None = None) -> tuple[bool,int]: """Import a single topo$FOO.zip source to PostGIS transactionally""" if schema is None: schema = guess_schema_from_file(path) if lockdir_fd is None: lock_fd = None else: lockfile = getSourcePathLockFileName(path) lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664, dir_fd=lockdir_fd) dso_transaction = False n = 0 try: if lock_fd is not None: logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)', path, lockfile) flock(lock_fd, LOCK_SH) if dso.TestCapability(ogr.ODsCTransactions): logging.debug('Starting transaction') dso_transaction = dso.StartTransaction() == ogr.OGRERR_NONE n = import_source2(path, dso, srs=srs, schema=schema, create_layer_options=create_layer_options) if dso_transaction: logging.debug('Committing transaction') dso_transaction = False if dso.CommitTransaction() != ogr.OGRERR_NONE: logging.error('Could not commit transaction') return True, n except Exception: # pylint: disable=broad-exception-caught traceback.print_exc() if dso_transaction: logging.debug('Rolling back transaction') # no need to catch the exception here if dso.CommitTransaction() != ogr.OGRERR_NONE: logging.error('Could not rollback transaction') finally: if lock_fd is not None: try: os.close(lock_fd) except (OSError, ValueError): logging.exception('Could not close lockfile for %s', path) return False, n def import_source2(path : str, dso : gdal.Dataset, srs : Optional[osr.SpatialReference] = None, schema : Optional[str] = None, create_layer_options : Optional[dict[str,Any]] = None) -> int: """Import a single shape file, or recursively all shape files containing in a .zip file, to PostGIS.""" n = 0 if path.lower().endswith('.zip') and zipfile.is_zipfile(path): logging.debug('Opening %s as ZipFile', path) with zipfile.ZipFile(path, mode='r') as z: for zi in z.infolist(): if zi.is_dir(): raise NotImplementedError(f'{zi.filename}: Zipped directories are ' 'not supported') if zi.filename == 'uttag.json': continue with tempfile.TemporaryDirectory() as tmpdir: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.debug('Extracting %s from %s into %s/', zi.filename, path, tmpdir.removesuffix('/')) elif not zi.filename.lower().endswith('.zip'): logging.info('Extracting %s', zi.filename) p = z.extract(zi, path=tmpdir) n += import_source2(p, dso=dso, srs=srs, schema=schema, create_layer_options=create_layer_options) return n kwargs, _ = gdalSetOpenExArgs({}, flags=GDAL_OF_VECTOR|GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR) logging.debug('OpenEx(%s, %s)', path, str(kwargs)) ds = gdal.OpenEx(path, **kwargs) if ds is None: raise RuntimeError(f'Could not open {path}') n = ds.GetLayerCount() logging.debug('Opened %s (driver %s, %d layers)', path, ds.GetDriver().ShortName, n) filename = Path(path).name for i in range(n): import_layer(layer=ds.GetLayerByIndex(i), dso=dso, filename=filename, srs=srs, schema=schema, create_layer_options=create_layer_options) return n # pylint: disable-next=missing-function-docstring def main() -> NoReturn: common.init_logger(app=os.path.basename(__file__), level=logging.INFO) parser = argparse.ArgumentParser(description='Import Lantmäteriets Topografi products.') parser.add_argument('--cachedir', default=None, help=f'cache directory (default: {os.curdir})') parser.add_argument('--debug', action='count', default=0, help=argparse.SUPPRESS) parser.add_argument('--schema', default=None, help='PostgreSQL schema name') parser.add_argument('--lockfile', default=None, help='obtain an exclusive lock before processing') parser.add_argument('--lockdir-sources', default=None, help='optional directory for lock files to source paths') parser.add_argument('input', nargs='+', metavar='topo.zip', help='zipfile(s) to process') args = parser.parse_args() if args.debug > 0: # pylint: disable=duplicate-code logging.getLogger().setLevel(logging.DEBUG) if args.debug > 1: gdal.ConfigurePythonLogging(enable_debug=True) config = common.parse_config() # validate configuration if 'dataset' not in config: raise BadConfiguration('Configuration does not specify output dataset') # set global GDAL/OGR configuration options # pylint: disable=duplicate-code for pszKey, pszValue in config.get('GDALconfig', {}).items(): logging.debug('gdal.SetConfigOption(%s, %s)', pszKey, pszValue) gdal.SetConfigOption(pszKey, pszValue) # get configured Spatial Reference System and extent srs = getSRS(config.get('SRS', None)) _extent = getExtent(config.get('extent', None), srs=srs) if args.lockfile is not None: # obtain an exclusive lock and don't release it until exiting the program lock_fd = os.open(args.lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644) logging.debug('flock("%s", LOCK_EX)', args.lockfile) flock(lock_fd, LOCK_EX) # pylint: enable=duplicate-code # open output dataset (possibly create it first) dso = openOutputDS(config['dataset']) if args.cachedir is not None: args.cachedir = Path(args.cachedir) if args.lockdir_sources is None: umask = lockdir_fd = None else: umask = os.umask(0o002) lockdir_fd = os.open(args.lockdir_sources, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY) rv = 0 try: for path in args.input: start = time_monotonic() b, n = import_source(path if args.cachedir is None else str(args.cachedir.joinpath(path)), dso, srs=srs, lockdir_fd=lockdir_fd, schema=args.schema, create_layer_options=config['dataset'].get('create-layer-options', None)) if not b: rv = 1 logging.info('Imported %d layers from %s in %s', n, path, common.format_time(time_monotonic() - start)) finally: if lockdir_fd is not None: try: os.close(lockdir_fd) except (OSError, ValueError): logging.exception('Could not close lockdir') if umask is not None: os.umask(umask) sys.exit(rv) gdal.UseExceptions() main()