diff options
author | Guilhem Moulin <guilhem@fripost.org> | 2025-04-18 14:00:48 +0200 |
---|---|---|
committer | Guilhem Moulin <guilhem@fripost.org> | 2025-04-19 19:25:20 +0200 |
commit | 6bd4f5f19928cd2783defca0316bcb6bbc042cd2 (patch) | |
tree | a805f5fdd603776c20bc159729e8d01b5adca08f /common.py | |
parent | 75eccbb389bf5e8c841e68c2e81854f22d72bad0 (diff) |
webmap-import: Major refactoring.
Diffstat (limited to 'common.py')
-rw-r--r-- | common.py | 157 |
1 files changed, 18 insertions, 139 deletions
@@ -56,6 +56,13 @@ class MissingConfiguration(Exception): def __init__(self, name : str) -> Never: super().__init__(f'Could not find configuration file {name}') +class BadConfiguration(Exception): + """Exception raised when there is a bad configuration""" + def __init__(self, message : str, config_path : Optional[Path] = None) -> Never: + if config_path is not None: + message = str(config_path) + ': ' + message + super().__init__(message) + def find_config(filename : str = 'config.yml', appname : str = 'webmap') -> Path: """Return the configuration file path""" dirs = [ @@ -77,8 +84,16 @@ def parse_config(path : Optional[Path] = None, with config_path.open(mode='r', encoding='utf-8') as fp: config = yaml.safe_load(fp) - # filter layers that are not of interest layers = config.get('layers', {}) + for name, layerdefs in layers.items(): + if isinstance(layerdefs, dict) and 'sources' not in layerdefs: + layers[name] = { 'sources': [layerdefs] } + for k in ('description', 'create', 'publish'): + if k in layerdefs: + layers[name][k] = layerdefs.pop(k) + layerdefs = layers[name] + + # filter layers that are not of interest if groupnames is not None: layernames = [] layer_groups = config.get('layer-groups', {}) @@ -123,8 +138,8 @@ def parse_config(path : Optional[Path] = None, if isinstance(extent, list): config['extent'] = tuple(extent) if config.get('SRS', None) is None: - # pylint: disable-next=broad-exception-raised - raise Exception('Configured extent without SRS') + raise BadConfiguration('Configured extent without SRS', + config_path=config_path) return config @@ -149,142 +164,6 @@ def format_time(ts : float, precision : int = 3) -> str: h, m = divmod(m, 60) return f'{h:02d}:{m:02d}:{s:0{w}.{precision}f}' -# pylint: disable-next=invalid-name, redefined-builtin -def gdalVersionMin(gdal, maj : int = 0, min : int = 0, rev : int = 0) -> bool: - """Return a boolean indicating whether the installer GDAL version is - greater than or equal to the provider (maj, min, rev) triplet.""" - - if maj < 1 or (maj == 1 and min < 10): - # GDAL_VERSION_NUM() macro was changed in 1.10. That version - # was released in 2013 so we blindly assume the installer - # version is more recent - return True - - version_cur = int(gdal.VersionInfo()) - # cf. GDAL_COMPUTE_VERSION(maj,min,rev) in gcore/gdal_version.h.in - version_min = maj*1000000 + min*10000 + rev*100 - return version_min <= version_cur - -# pylint: disable-next=invalid-name -def gdalGetMetadataItem(obj, k : str) -> bool: - """Wrapper around gdal.MajorObject.GetMetadataItem(name).""" - - v = obj.GetMetadataItem(k) - if v is not None and isinstance(v, str): - return v.upper() == 'YES' - - return False - -def escape_identifier(identifier : str) -> str: - """Escape the given identifier, cf. - swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id().""" - - if '\x00' in identifier: - # pylint: disable-next=broad-exception-raised - raise Exception(f'Invalid identifier "{identifier}"') - - # SQL:1999 delimited identifier - return '"' + identifier.replace('"', '""') + '"' - -# pylint: disable-next=invalid-name,dangerous-default-value -def gdalSetOpenExArgs(gdal, option_dict : Optional[dict[str, Any]] = {}, flags : int = 0): - """Return a pair kwargs and driver to use with gdal.OpenEx().""" - - kwargs = { 'nOpenFlags': gdal.OF_VECTOR | flags } - - fmt = option_dict.get('format', None) - if fmt is None: - drv = None - else: - drv = gdal.GetDriverByName(fmt) - if drv is None: - # pylint: disable-next=broad-exception-raised - raise Exception(f'Unknown driver name "{fmt}"') - if not gdalGetMetadataItem(drv, gdal.DCAP_VECTOR): - # pylint: disable-next=broad-exception-raised - raise Exception(f'Driver "{drv.ShortName}" has no vector capabilities') - kwargs['allowed_drivers'] = [ drv.ShortName ] - - oo = option_dict.get('open-options', None) - if oo is not None: - kwargs['open_options'] = [ k + '=' + str(v) for k, v in oo.items() ] - return kwargs, drv - -# pylint: disable-next=invalid-name -def getSRS(osr, srs_str : Optional[str]): - """Return the decoded Spatial Reference System.""" - - if srs_str is None: - return None - - srs = osr.SpatialReference() - if srs_str.startswith('EPSG:'): - code = int(srs_str.removeprefix('EPSG:')) - srs.ImportFromEPSG(code) - else: - # pylint: disable-next=broad-exception-raised - raise Exception(f'Unknown SRS {srs_str}') - - logging.debug('Default SRS: "%s" (%s)', srs.ExportToProj4(), srs.GetName()) - return srs - -# pylint: disable-next=invalid-name -def getExtent(extent : tuple[float, float, float, float] | list[float], srs = None): - """Convert extent [minX, minY, maxX, maxY] into a polygon and assign the - given SRS. Return a pair with the densified and non-densified extent. - Like apps/ogr2ogr_lib.cpp, the former is obtained by segmentizing the - polygon to make sure it is sufficiently densified when transforming to - source layer SRS for spatial filtering.""" - - if extent is None: - return None, None - - if not isinstance(extent, (list, tuple)) or len(extent) != 4: - # pylint: disable-next=broad-exception-raised - raise Exception(f'Invalid extent {extent}') - if srs is None: - # pylint: disable-next=broad-exception-raised - raise Exception('Configured extent but no SRS') - - logging.debug('Configured extent in %s: %s', - srs.GetName(), ', '.join(map(str, extent))) - - from osgeo import ogr, osr # pylint: disable=import-outside-toplevel - ogr.UseExceptions() - - ring = ogr.Geometry(ogr.wkbLinearRing) - ring.AddPoint_2D(extent[0], extent[1]) - ring.AddPoint_2D(extent[2], extent[1]) - ring.AddPoint_2D(extent[2], extent[3]) - ring.AddPoint_2D(extent[0], extent[3]) - ring.AddPoint_2D(extent[0], extent[1]) - - polygon = ogr.Geometry(ogr.wkbPolygon) - polygon.AddGeometry(ring) - - # we expressed extent as minX, minY, maxX, maxY (easting/northing - # ordered, i.e., in traditional GIS order) - srs2 = srs.Clone() - srs2.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER) - polygon.AssignSpatialReference(srs2) - if not srs2.IsSame(srs): - polygon.TransformTo(srs) - - # densify the rectangle to avoid issues when reprojecting to the - # source layer SRS, cf. apps/ogr2ogr_lib.cpp:ApplySpatialFilter() - polygon_dense = polygon.Clone() - segment_distance_metre = 10 * 1000 - if srs.IsGeographic(): - # pylint: disable-next=invalid-name - dfMaxLength = segment_distance_metre / math.radians(srs.GetSemiMajor()) - polygon_dense.Segmentize(dfMaxLength) - elif srs.IsProjected(): - # pylint: disable-next=invalid-name - dfMaxLength = segment_distance_metre / srs.GetLinearUnits() - polygon_dense.Segmentize(dfMaxLength) - - return polygon_dense, polygon - ###### # The function definitions below are taken from cpython's source code |