#!/usr/bin/python3 #---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (extract/import layers) # Copyright © 2024-2025 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- # pylint: disable=invalid-name, missing-module-docstring, fixme from os import O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC import os import sys from fcntl import flock, LOCK_EX import logging import argparse import re from pathlib import Path from typing import Any, Optional, NoReturn import traceback from osgeo import gdal, ogr from osgeo.gdalconst import ( CE_None as GDAL_CE_None, DCAP_DEFAULT_FIELDS as GDAL_DCAP_DEFAULT_FIELDS, DCAP_NOTNULL_FIELDS as GDAL_DCAP_NOTNULL_FIELDS, DCAP_UNIQUE_FIELDS as GDAL_DCAP_UNIQUE_FIELDS, ) from osgeo import gdalconst import common from common import BadConfiguration, escape_identifier from common_gdal import ( gdalVersionMin, gdalGetMetadataItem, getSRS, getExtent, parseGeomType, parseFieldType, parseSubFieldType, parseTimeZone ) from import_source import ( openOutputDS, createOutputLayer, validateOutputLayer, clearLayer, importSources ) def setFieldIf(cond : bool, attrName : str, val : Any, data : dict[str, Any], fldName : str, drvName : str, log = logging.warning) -> None: """Conditionally set a field""" if cond: data[attrName] = val else: if isinstance(val, str): val2 = '"' + val + '"' else: val2 = str(val) log('Ignoring %s=%s on field "%s" (not supported by %s driver)', attrName, val2, fldName, drvName) # pylint: disable-next=too-many-branches def validate_schema(layers : dict[str, Any], drvo : Optional[gdal.Driver] = None, lco_defaults : Optional[dict[str, str]] = None) -> None: """Validate layer creation options and schema. The schema is modified in place with the parsed result. (We need the driver of the output dataset to determine capability on constraints.)""" # Cf. https://github.com/OSGeo/gdal/blob/master/NEWS.md if gdalVersionMin(maj=3, min=7): # list of capability flags supported by the CreateField() API drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_CREATION_FIELD_DEFN_FLAGS) drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else [] drvoSupportsFieldComment = 'Comment' in drvoFieldDefnFlags # GetTZFlag()/SetTZFlag() and OGR_TZFLAG_* constants added in 3.8.0 hasTZFlagSupport = gdalVersionMin(maj=3, min=8) else: # list of flags supported by the OGRLayer::AlterFieldDefn() API drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_ALTER_FIELD_DEFN_FLAGS) drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else [] # GetComment()/SetComment() added in 3.7.0 drvoSupportsFieldComment = False hasTZFlagSupport = False # cache driver capabilities drvoSupportsFieldWidthPrecision = 'WidthPrecision' in drvoFieldDefnFlags drvoSupportsFieldNullable = ('Nullable' in drvoFieldDefnFlags and gdalGetMetadataItem(drvo, GDAL_DCAP_NOTNULL_FIELDS)) drvoSupportsFieldUnique = ('Unique' in drvoFieldDefnFlags and gdalGetMetadataItem(drvo, GDAL_DCAP_UNIQUE_FIELDS)) drvoSupportsFieldDefault = ('Default' in drvoFieldDefnFlags and gdalGetMetadataItem(drvo, GDAL_DCAP_DEFAULT_FIELDS)) drvoSupportsFieldAlternativeName = 'AlternativeName' in drvoFieldDefnFlags for layername, layerdef in layers.items(): create = layerdef.get('create', None) if create is None or len(create) < 1: logging.warning('Layer "%s" has no creation schema', layername) continue # prepend global layer creation options (dataset:create-layer-options) # and build the option=value list lco = create.get('options', None) if lco_defaults is not None or lco is not None: options = [] if lco_defaults is not None: options += [ k + '=' + str(v) for k, v in lco_defaults.items() ] if lco is not None: options += [ k + '=' + str(v) for k, v in lco.items() ] create['options'] = options # parse geometry type create['geometry-type'] = parseGeomType(create.get('geometry-type', None)) fields = create.get('fields', None) if fields is None: create['fields'] = [] else: fields_set = set() for idx, fld_def in enumerate(fields): fld_name = fld_def.get('name', None) if fld_name is None or fld_name == '': raise BadConfiguration(f'Field #{idx} has no name') if fld_name in fields_set: raise BadConfiguration(f'Duplicate field "{fld_name}"') fields_set.add(fld_name) fld_def2 = { 'Name': fld_name } for k, v in fld_def.items(): k2 = k.lower() if k2 == 'name': pass elif k2 in ('alternativename', 'alias'): setFieldIf(drvoSupportsFieldAlternativeName, 'AlternativeName', v, fld_def2, fld_name, drvo.ShortName, log=logging.debug) elif k2 == 'comment': setFieldIf(drvoSupportsFieldComment, 'Comment', v, fld_def2, fld_name, drvo.ShortName, log=logging.debug) elif k2 == 'type': fld_def2['Type'] = parseFieldType(v) elif k2 == 'subtype': fld_def2['SubType'] = parseSubFieldType(v) elif k2 == 'tz': if hasTZFlagSupport: fld_def2['TZFlag'] = parseTimeZone(v) else: logging.debug('Ignoring TZ="%s" on field "%s" (OGR v%s is too old)', v, fld_name, gdal.__version__) elif k2 == 'width' and v is not None and isinstance(v, int): setFieldIf(drvoSupportsFieldWidthPrecision, 'Width', v, fld_def2, fld_name, drvo.ShortName) elif k2 == 'precision' and v is not None and isinstance(v, int): setFieldIf(drvoSupportsFieldWidthPrecision, 'Precision', v, fld_def2, fld_name, drvo.ShortName) # constraints elif k2 == 'default': setFieldIf(drvoSupportsFieldDefault, 'Default', v, fld_def2, fld_name, drvo.ShortName) elif k2 == 'nullable' and v is not None and isinstance(v, bool): setFieldIf(drvoSupportsFieldNullable, 'Nullable', v, fld_def2, fld_name, drvo.ShortName) elif k2 == 'unique' and v is not None and isinstance(v, bool): setFieldIf(drvoSupportsFieldUnique, 'Unique', v, fld_def2, fld_name, drvo.ShortName) else: raise BadConfiguration(f'Field "{fld_name}" has unknown key "{k}"') fields[idx] = fld_def2 def setOutputFieldMap(defn : ogr.FeatureDefn, sources : dict[str, Any]): """Setup output field mapping, modifying the sources dictionary in place.""" fieldMap = {} n = defn.GetFieldCount() for i in range(n): fld = defn.GetFieldDefn(i) fldName = fld.GetName() fieldMap[fldName] = i for source in sources: source_import = source['import'] fieldMap2 = source_import.get('field-map', None) if fieldMap2 is None: fieldMap2 = fieldMap else: if isinstance(fieldMap2, list): # convert list to identity dictionary fieldMap2 = { fld: fld for fld in fieldMap2 } for ifld, ofld in fieldMap2.items(): i = fieldMap.get(ofld, None) if i is None: raise RuntimeError(f'Ouput layer has no field named "{ofld}"') fieldMap2[ifld] = i source_import['field-map'] = fieldMap2 # validate field value mapping valueMap = source_import.get('value-map', None) if valueMap is not None: for fldName, rules in valueMap.items(): if rules is None: continue if not isinstance(rules, list): rules = [rules] for idx, rule in enumerate(rules): if rule is None or not isinstance(rule, dict): raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}') if 'type' not in rule: ruleType = rule['type'] = 'literal' else: ruleType = rule['type'] if ('replace' not in rule or 'with' not in rule or len(rule) != 3 or ruleType is None or ruleType not in ('literal', 'regex')): raise RuntimeError(f'Field "{fldName}" has invalid rule #{idx}: {rule}') if ruleType == 'regex': rule['replace'] = re.compile(rule['replace']) rules[idx] = ( rule['replace'], rule['with'] ) def validate_sources(layers : dict[str, Any]) -> None: """Mangle and validate layer sources and import definitions""" toremove = set() for layername, layerdefs in layers.items(): sources = layerdefs.get('sources', None) if sources is None or len(sources) < 1: logging.warning('Output layer "%s" has no definition, skipping', layername) toremove.add(layername) continue for idx, layerdef in enumerate(sources): importdef = layerdef.get('import', None) if importdef is None: raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' 'has no import definition') sourcedef = layerdef.get('source', None) unar = None if sourcedef is None else sourcedef.get('unar', None) src = None if sourcedef is None else sourcedef.get('path', None) ds_srcpath = importdef.get('path', None) if src is None and unar is None and ds_srcpath is not None: # fallback to importe:path if there is no unarchiving recipe src = ds_srcpath if unar is not None and ds_srcpath is None: raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' 'has no import source path') if src is None: raise BadConfiguration(f'Source #{idx} of output layer "{layername}" ' 'has no source path') layerdef['source'] = { 'path': src, 'unar': unar } for layername in toremove: layers.pop(layername) # pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements def main() -> NoReturn: common.init_logger(app=os.path.basename(__file__), level=logging.INFO) parser = argparse.ArgumentParser(description='Extract and import GIS layers.') parser.add_argument('--cachedir', default=None, help=f'cache directory (default: {os.curdir})') parser.add_argument('--debug', action='count', default=0, help=argparse.SUPPRESS) parser.add_argument('--lockfile', default=None, help='obtain an exclusive lock before starting unpacking and importing') parser.add_argument('groupname', nargs='*', help='group layer name(s) to process') args = parser.parse_args() if args.debug > 0: # pylint: disable=duplicate-code logging.getLogger().setLevel(logging.DEBUG) if args.debug > 1: gdal.ConfigurePythonLogging(enable_debug=True) config = common.parse_config(groupnames=None if args.groupname == [] else args.groupname) # validate configuration if 'dataset' not in config: raise BadConfiguration('Configuration does not specify output dataset') layers = config.get('layers', {}) validate_sources(layers) # set global GDAL/OGR configuration options for pszKey, pszValue in config.get('GDALconfig', {}).items(): logging.debug('gdal.SetConfigOption(%s, %s)', pszKey, pszValue) gdal.SetConfigOption(pszKey, pszValue) # open output dataset (possibly create it first) dso = openOutputDS(config['dataset']) validate_schema(layers, drvo=dso.GetDriver(), lco_defaults=config['dataset'].get('create-layer-options', None)) # get configured Spatial Reference System and extent srs = getSRS(config.get('SRS', None)) extent = getExtent(config.get('extent', None), srs=srs)[0] if args.lockfile is not None: # obtain an exclusive lock and don't release it until exiting the program lock_fd = os.open(args.lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644) logging.debug('flock("%s", LOCK_EX)', args.lockfile) flock(lock_fd, LOCK_EX) # create all output layers before starting the transaction for layername, layerdef in layers.items(): lyr = dso.GetLayerByName(layername) if lyr is not None: # TODO dso.DeleteLayer(layername) if --overwrite and # dso.TestCapability(ogr.ODsCDeleteLayer) # (Sets OVERWRITE=YES for PostgreSQL and GPKG.) continue if not dso.TestCapability(ogr.ODsCCreateLayer): raise RuntimeError(f'Output driver {dso.GetDriver().ShortName} does not ' 'support layer creation') createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None)) cachedir = Path(args.cachedir) if args.cachedir is not None else None if (dso.TestCapability(ogr.ODsCTransactions) and dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')): logging.debug('Starting transaction') dsoTransaction = dso.StartTransaction() == ogr.OGRERR_NONE else: logging.warning('Output driver %s does not support dataset transactions or SQL SAVEPOINTs', dso.GetDriver().ShortName) dsoTransaction = False rv = 0 try: for layername, layerdef in layers.items(): logging.info('Processing output layer "%s"', layername) lyr = dso.GetLayerByName(layername) if lyr is None: raise RuntimeError(f'Failed to create output layer "{layername}"??') if not lyr.TestCapability(ogr.OLCSequentialWrite): raise RuntimeError(f'Output layer "{layername}" has no working ' 'CreateFeature() method') validateOutputLayer(lyr, srs=srs, options=layerdef['create']) sources = layerdef['sources'] # setup output field mapping in the sources dictionary setOutputFieldMap(lyr.GetLayerDefn(), sources) if dsoTransaction: lyrTransaction = 'SAVEPOINT ' + escape_identifier('savept_' + layername) logging.debug(lyrTransaction) dso.ExecuteSQL(lyrTransaction) elif lyr.TestCapability(ogr.OLCTransactions): # start transaction if possible logging.debug('Starting transaction') lyrTransaction = lyr.StartTransaction() == ogr.OGRERR_NONE else: logging.warning('Unsafe update, output layer "%s" does not support transactions', layername) lyrTransaction = False try: clearLayer(dso, lyr) description = layerdef.get('description', None) if (description is not None and lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None): logging.warning('Could not set description metadata') importSources(lyr, sources=sources, cachedir=cachedir, extent=extent) if isinstance(lyrTransaction, bool) and lyrTransaction: # commit transaction logging.debug('Committing transaction') lyrTransaction = False if lyr.CommitTransaction() != ogr.OGRERR_NONE: logging.error('Could not commit transaction') rv = 1 except Exception: # pylint: disable=broad-exception-caught if isinstance(lyrTransaction, str): query = 'ROLLBACK TO ' + lyrTransaction logging.exception('Exception occured in transaction, %s', query) logging.debug(query) dso.ExecuteSQL(query) elif isinstance(lyrTransaction, bool) and lyrTransaction: logging.exception('Exception occured in transaction, rolling back') try: if lyr.RollbackTransaction() != ogr.OGRERR_NONE: logging.error('Could not rollback transaction') except RuntimeError: logging.exception('Could not rollback transaction') else: traceback.print_exc() rv = 1 finally: lyr = None # close output layer if isinstance(lyrTransaction, str): query = 'RELEASE ' + lyrTransaction logging.debug(query) dso.ExecuteSQL(query) if dsoTransaction: # commit transaction logging.debug('Committing transaction') dsoTransaction = False if dso.CommitTransaction() != ogr.OGRERR_NONE: logging.error('Could not commit transaction') rv = 1 except Exception: # pylint: disable=broad-exception-caught if dsoTransaction: logging.exception('Exception occured in transaction, rolling back') try: if dso.RollbackTransaction() != ogr.OGRERR_NONE: logging.error('Could not rollback transaction') except RuntimeError: logging.exception('Could not rollback transaction') else: traceback.print_exc() rv = 1 dso = None srs = None extent = None sys.exit(rv) gdal.UseExceptions() main()