aboutsummaryrefslogtreecommitdiffstats
path: root/geodata-import-topo
diff options
context:
space:
mode:
Diffstat (limited to 'geodata-import-topo')
-rwxr-xr-xgeodata-import-topo341
1 files changed, 341 insertions, 0 deletions
diff --git a/geodata-import-topo b/geodata-import-topo
new file mode 100755
index 0000000..d814771
--- /dev/null
+++ b/geodata-import-topo
@@ -0,0 +1,341 @@
+#!/usr/bin/python3
+
+#----------------------------------------------------------------------
+# Import Lantmäteriet's Topografi Vektor products.
+# Copyright © 2026 Guilhem Moulin <guilhem@fripost.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#----------------------------------------------------------------------
+
+# pylint: disable=invalid-name, missing-module-docstring, fixme
+
+from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY
+import os
+from fcntl import flock, LOCK_EX, LOCK_SH
+import logging
+import argparse
+import zipfile
+import tempfile
+from typing import Any, Final, NoReturn, Optional
+from pathlib import Path
+import sys
+import traceback
+from time import monotonic as time_monotonic
+
+from osgeo import gdal, ogr, osr
+from osgeo.gdalconst import (
+ CE_None as GDAL_CE_None,
+ OF_READONLY as GDAL_OF_READONLY,
+ OF_VECTOR as GDAL_OF_VECTOR,
+ OF_VERBOSE_ERROR as GDAL_OF_VERBOSE_ERROR,
+)
+
+import common
+from common import BadConfiguration, getSourcePathLockFileName
+from common_gdal import getSRS, getExtent, gdalSetOpenExArgs
+from import_source import (
+ openOutputDS,
+ createOutputLayer,
+ validateOutputLayer,
+ importLayer,
+ clearLayer,
+ clusterLayer,
+)
+
+def get_layer_schema(layer : ogr.Layer) -> dict[str, Any]:
+ """Return the layer definition dictionary for the given OGR layer."""
+ schema = {}
+ defn = layer.GetLayerDefn()
+ n = defn.GetGeomFieldCount()
+ if n != 1:
+ raise NotImplementedError(f'Layer "{layer.GetName()}" has geometry field count {n} != 1')
+ geom_fld = defn.GetGeomFieldDefn(0)
+ if geom_fld.IsIgnored():
+ raise NotImplementedError(f'Geometry field #0 in layer "{layer.GetName()}" is ignored')
+ schema['geometry-type'] = geom_fld.GetType()
+
+ fields = schema['fields'] = []
+ for i in range(defn.GetFieldCount()):
+ fld = defn.GetFieldDefn(i)
+ fld_name = fld.GetName()
+ if fld.IsIgnored():
+ logging.warning('Field #%d "%s" in source layer "%s" is ignored',
+ i, fld_name, layer.GetName())
+ continue
+ field = { 'Name': fld_name }
+ fields.append(field)
+
+ if fld_name.lower() in ('objektidentitet', 'skapad', 'objekttypnr', 'objekttyp'):
+ field['Nullable'] = False
+ else:
+ field['Nullable'] = fld.IsNullable()
+ field['Unique'] = fld.IsUnique()
+
+ field['AlternativeName'] = fld.GetAlternativeName()
+ field['Comment'] = fld.GetComment()
+
+ fld_type = field['Type'] = fld.GetType()
+ if fld_type == ogr.OFTString and fld_name.lower() == 'objektidentitet':
+ field['SubType'] = ogr.OFSTUUID
+ else:
+ field['SubType'] = fld.GetSubType()
+ field['Width'] = fld.GetWidth()
+ field['Precision'] = fld.GetPrecision()
+ field['Justify'] = fld.GetJustify()
+ if fld_type in (ogr.OFTTime, ogr.OFTDate, ogr.OFTDateTime):
+ field['TZFlag'] = fld.GetTZFlag()
+
+ # TODO fail if generated or has default
+ #print(fld.GetDefault(), fld.IsDefaultDriverSpecific())
+ #print(fld.GetDomainName())
+ #print(fld.IsGenerated())
+
+ return schema
+
+
+def get_output_layer(layer_src : ogr.Layer, dso : gdal.Dataset,
+ layername_dst : str,
+ srs : Optional[osr.SpatialReference] = None,
+ create_layer_options : Optional[dict[str,Any]] = None) -> ogr.Layer:
+ """Get the destination layer to mirror the source to. It is
+ automatically created if needs be."""
+ layer_dst = dso.GetLayerByName(layername_dst)
+ schema = get_layer_schema(layer_src)
+ if layer_dst is None:
+ schema['options'] = options = []
+ if create_layer_options is not None:
+ options += [ k + '=' + str(v) for k, v in create_layer_options.items() ]
+ layer_dst = createOutputLayer(dso, layername_dst, srs=srs, options=schema)
+
+ validateOutputLayer(layer_dst, srs=srs, options=schema)
+
+ return layer_dst
+
+
+LAYER_NAMES : Final[set[tuple[str|None,str]]] = set()
+def import_layer(layer : ogr.Layer, dso : gdal.Dataset, filename : str,
+ srs : Optional[osr.SpatialReference] = None,
+ schema : Optional[str] = None,
+ create_layer_options : Optional[dict[str,Any]] = None) -> None:
+ """Import a give layer to PostGIS"""
+ layer_name = layer.GetName()
+ k = (schema, layer_name)
+ if k in LAYER_NAMES:
+ raise RuntimeError(f'Duplicate layer "{layer_name}" in schema {schema}')
+ LAYER_NAMES.add(k)
+ logging.info('Importing layer %s from %s to schema %s', layer_name, filename, schema)
+
+ layer_dst = get_output_layer(layer, dso=dso,
+ layername_dst=layer_name if schema is None else
+ schema + '.' + layer_name,
+ srs=srs,
+ create_layer_options=create_layer_options)
+
+ description = layer.GetMetadataItem('DESCRIPTION')
+ if (description is not None and
+ layer_dst.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
+ logging.warning('Could not set description metadata')
+
+ clearLayer(layer_dst, identity='RESTART IDENTITY')
+ defn_dst = layer_dst.GetLayerDefn()
+ field_map = { defn_dst.GetFieldDefn(i).GetName() : i for i in range(defn_dst.GetFieldCount()) }
+ importLayer(layer_dst, layer, args={'field-map': field_map}, extent=None)
+
+ if layer_dst.GetLayerDefn().GetGeomType() != ogr.wkbNone:
+ clusterLayer(layer_dst, column_name=layer_dst.GetGeometryColumn())
+
+def guess_schema_from_file(path : str) -> str:
+ """Infer PostgreSQL schema name from the filename of the .zip source."""
+ stem = Path(path).stem
+ topomap = { 'topo1m': 'topo1000' }
+ for l in (1000, 250, 100, 50, 10):
+ l = 'topo' + str(l)
+ topomap[l] = l
+ try:
+ return 'lm_' + topomap[stem.lower()]
+ except KeyError as e:
+ raise RuntimeError(f'Could not guess schema name from input filename {path}') from e
+
+def import_source(path : str, dso : gdal.Dataset,
+ srs : Optional[osr.SpatialReference] = None,
+ schema : Optional[str] = None,
+ create_layer_options : Optional[dict[str,Any]] = None,
+ lockdir_fd: int|None = None) -> tuple[bool,int]:
+ """Import a single topo$FOO.zip source to PostGIS transactionally"""
+ if schema is None:
+ schema = guess_schema_from_file(path)
+ if lockdir_fd is None:
+ lock_fd = None
+ else:
+ lockfile = getSourcePathLockFileName(path)
+ lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664,
+ dir_fd=lockdir_fd)
+ dso_transaction = False
+ n = 0
+ try:
+ if lock_fd is not None:
+ logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)',
+ path, lockfile)
+ flock(lock_fd, LOCK_SH)
+ if dso.TestCapability(ogr.ODsCTransactions):
+ logging.debug('Starting transaction')
+ dso_transaction = dso.StartTransaction() == ogr.OGRERR_NONE
+ n = import_source2(path, dso, srs=srs, schema=schema,
+ create_layer_options=create_layer_options)
+ if dso_transaction:
+ logging.debug('Committing transaction')
+ dso_transaction = False
+ if dso.CommitTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not commit transaction')
+ return True, n
+ except Exception: # pylint: disable=broad-exception-caught
+ traceback.print_exc()
+ if dso_transaction:
+ logging.debug('Rolling back transaction')
+ # no need to catch the exception here
+ if dso.CommitTransaction() != ogr.OGRERR_NONE:
+ logging.error('Could not rollback transaction')
+ finally:
+ if lock_fd is not None:
+ try:
+ os.close(lock_fd)
+ except (OSError, ValueError):
+ logging.exception('Could not close lockfile for %s', path)
+ return False, n
+
+def import_source2(path : str, dso : gdal.Dataset,
+ srs : Optional[osr.SpatialReference] = None,
+ schema : Optional[str] = None,
+ create_layer_options : Optional[dict[str,Any]] = None) -> int:
+ """Import a single shape file, or recursively all shape files
+ containing in a .zip file, to PostGIS."""
+ n = 0
+ if path.lower().endswith('.zip') and zipfile.is_zipfile(path):
+ logging.debug('Opening %s as ZipFile', path)
+ with zipfile.ZipFile(path, mode='r') as z:
+ for zi in z.infolist():
+ if zi.is_dir():
+ raise NotImplementedError(f'{zi.filename}: Zipped directories are '
+ 'not supported')
+ if zi.filename == 'uttag.json':
+ continue
+ with tempfile.TemporaryDirectory() as tmpdir:
+ if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
+ logging.debug('Extracting %s from %s into %s/',
+ zi.filename, path, tmpdir.removesuffix('/'))
+ elif not zi.filename.lower().endswith('.zip'):
+ logging.info('Extracting %s', zi.filename)
+ p = z.extract(zi, path=tmpdir)
+ n += import_source2(p, dso=dso, srs=srs, schema=schema,
+ create_layer_options=create_layer_options)
+ return n
+
+ kwargs, _ = gdalSetOpenExArgs({}, flags=GDAL_OF_VECTOR|GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR)
+ logging.debug('OpenEx(%s, %s)', path, str(kwargs))
+ ds = gdal.OpenEx(path, **kwargs)
+ if ds is None:
+ raise RuntimeError(f'Could not open {path}')
+
+ n = ds.GetLayerCount()
+ logging.debug('Opened %s (driver %s, %d layers)', path, ds.GetDriver().ShortName, n)
+ filename = Path(path).name
+ for i in range(n):
+ import_layer(layer=ds.GetLayerByIndex(i), dso=dso, filename=filename,
+ srs=srs, schema=schema,
+ create_layer_options=create_layer_options)
+ return n
+
+
+# pylint: disable-next=missing-function-docstring
+def main() -> NoReturn:
+ common.init_logger(app=os.path.basename(__file__), level=logging.INFO)
+
+ parser = argparse.ArgumentParser(description='Import Lantmäteriets Topografi products.')
+ parser.add_argument('--cachedir', default=None,
+ help=f'cache directory (default: {os.curdir})')
+ parser.add_argument('--debug', action='count', default=0,
+ help=argparse.SUPPRESS)
+ parser.add_argument('--schema', default=None, help='PostgreSQL schema name')
+ parser.add_argument('--lockfile', default=None,
+ help='obtain an exclusive lock before processing')
+ parser.add_argument('--lockdir-sources', default=None,
+ help='optional directory for lock files to source paths')
+ parser.add_argument('input', nargs='+', metavar='topo.zip', help='zipfile(s) to process')
+ args = parser.parse_args()
+
+ if args.debug > 0: # pylint: disable=duplicate-code
+ logging.getLogger().setLevel(logging.DEBUG)
+ if args.debug > 1:
+ gdal.ConfigurePythonLogging(enable_debug=True)
+
+ config = common.parse_config()
+
+ # validate configuration
+ if 'dataset' not in config:
+ raise BadConfiguration('Configuration does not specify output dataset')
+
+ # set global GDAL/OGR configuration options
+ # pylint: disable=duplicate-code
+ for pszKey, pszValue in config.get('GDALconfig', {}).items():
+ logging.debug('gdal.SetConfigOption(%s, %s)', pszKey, pszValue)
+ gdal.SetConfigOption(pszKey, pszValue)
+
+ # get configured Spatial Reference System and extent
+ srs = getSRS(config.get('SRS', None))
+ _extent = getExtent(config.get('extent', None), srs=srs)
+
+ if args.lockfile is not None:
+ # obtain an exclusive lock and don't release it until exiting the program
+ lock_fd = os.open(args.lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644)
+ logging.debug('flock("%s", LOCK_EX)', args.lockfile)
+ flock(lock_fd, LOCK_EX)
+ # pylint: enable=duplicate-code
+
+ # open output dataset (possibly create it first)
+ dso = openOutputDS(config['dataset'])
+
+ if args.cachedir is not None:
+ args.cachedir = Path(args.cachedir)
+
+ if args.lockdir_sources is None:
+ umask = lockdir_fd = None
+ else:
+ umask = os.umask(0o002)
+ lockdir_fd = os.open(args.lockdir_sources, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY)
+ rv = 0
+ try:
+ for path in args.input:
+ start = time_monotonic()
+ b, n = import_source(path if args.cachedir is None
+ else str(args.cachedir.joinpath(path)),
+ dso, srs=srs, lockdir_fd=lockdir_fd,
+ schema=args.schema,
+ create_layer_options=config['dataset'].get('create-layer-options',
+ None))
+ if not b:
+ rv = 1
+ logging.info('Imported %d layers from %s in %s', n, path,
+ common.format_time(time_monotonic() - start))
+ finally:
+ if lockdir_fd is not None:
+ try:
+ os.close(lockdir_fd)
+ except (OSError, ValueError):
+ logging.exception('Could not close lockdir')
+ if umask is not None:
+ os.umask(umask)
+ sys.exit(rv)
+
+gdal.UseExceptions()
+main()