aboutsummaryrefslogtreecommitdiffstats
path: root/common.py
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-04-18 14:00:48 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-04-19 19:25:20 +0200
commit6bd4f5f19928cd2783defca0316bcb6bbc042cd2 (patch)
treea805f5fdd603776c20bc159729e8d01b5adca08f /common.py
parent75eccbb389bf5e8c841e68c2e81854f22d72bad0 (diff)
webmap-import: Major refactoring.
Diffstat (limited to 'common.py')
-rw-r--r--common.py157
1 files changed, 18 insertions, 139 deletions
diff --git a/common.py b/common.py
index acbb5d8..0bece11 100644
--- a/common.py
+++ b/common.py
@@ -56,6 +56,13 @@ class MissingConfiguration(Exception):
def __init__(self, name : str) -> Never:
super().__init__(f'Could not find configuration file {name}')
+class BadConfiguration(Exception):
+ """Exception raised when there is a bad configuration"""
+ def __init__(self, message : str, config_path : Optional[Path] = None) -> Never:
+ if config_path is not None:
+ message = str(config_path) + ': ' + message
+ super().__init__(message)
+
def find_config(filename : str = 'config.yml', appname : str = 'webmap') -> Path:
"""Return the configuration file path"""
dirs = [
@@ -77,8 +84,16 @@ def parse_config(path : Optional[Path] = None,
with config_path.open(mode='r', encoding='utf-8') as fp:
config = yaml.safe_load(fp)
- # filter layers that are not of interest
layers = config.get('layers', {})
+ for name, layerdefs in layers.items():
+ if isinstance(layerdefs, dict) and 'sources' not in layerdefs:
+ layers[name] = { 'sources': [layerdefs] }
+ for k in ('description', 'create', 'publish'):
+ if k in layerdefs:
+ layers[name][k] = layerdefs.pop(k)
+ layerdefs = layers[name]
+
+ # filter layers that are not of interest
if groupnames is not None:
layernames = []
layer_groups = config.get('layer-groups', {})
@@ -123,8 +138,8 @@ def parse_config(path : Optional[Path] = None,
if isinstance(extent, list):
config['extent'] = tuple(extent)
if config.get('SRS', None) is None:
- # pylint: disable-next=broad-exception-raised
- raise Exception('Configured extent without SRS')
+ raise BadConfiguration('Configured extent without SRS',
+ config_path=config_path)
return config
@@ -149,142 +164,6 @@ def format_time(ts : float, precision : int = 3) -> str:
h, m = divmod(m, 60)
return f'{h:02d}:{m:02d}:{s:0{w}.{precision}f}'
-# pylint: disable-next=invalid-name, redefined-builtin
-def gdalVersionMin(gdal, maj : int = 0, min : int = 0, rev : int = 0) -> bool:
- """Return a boolean indicating whether the installer GDAL version is
- greater than or equal to the provider (maj, min, rev) triplet."""
-
- if maj < 1 or (maj == 1 and min < 10):
- # GDAL_VERSION_NUM() macro was changed in 1.10. That version
- # was released in 2013 so we blindly assume the installer
- # version is more recent
- return True
-
- version_cur = int(gdal.VersionInfo())
- # cf. GDAL_COMPUTE_VERSION(maj,min,rev) in gcore/gdal_version.h.in
- version_min = maj*1000000 + min*10000 + rev*100
- return version_min <= version_cur
-
-# pylint: disable-next=invalid-name
-def gdalGetMetadataItem(obj, k : str) -> bool:
- """Wrapper around gdal.MajorObject.GetMetadataItem(name)."""
-
- v = obj.GetMetadataItem(k)
- if v is not None and isinstance(v, str):
- return v.upper() == 'YES'
-
- return False
-
-def escape_identifier(identifier : str) -> str:
- """Escape the given identifier, cf.
- swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id()."""
-
- if '\x00' in identifier:
- # pylint: disable-next=broad-exception-raised
- raise Exception(f'Invalid identifier "{identifier}"')
-
- # SQL:1999 delimited identifier
- return '"' + identifier.replace('"', '""') + '"'
-
-# pylint: disable-next=invalid-name,dangerous-default-value
-def gdalSetOpenExArgs(gdal, option_dict : Optional[dict[str, Any]] = {}, flags : int = 0):
- """Return a pair kwargs and driver to use with gdal.OpenEx()."""
-
- kwargs = { 'nOpenFlags': gdal.OF_VECTOR | flags }
-
- fmt = option_dict.get('format', None)
- if fmt is None:
- drv = None
- else:
- drv = gdal.GetDriverByName(fmt)
- if drv is None:
- # pylint: disable-next=broad-exception-raised
- raise Exception(f'Unknown driver name "{fmt}"')
- if not gdalGetMetadataItem(drv, gdal.DCAP_VECTOR):
- # pylint: disable-next=broad-exception-raised
- raise Exception(f'Driver "{drv.ShortName}" has no vector capabilities')
- kwargs['allowed_drivers'] = [ drv.ShortName ]
-
- oo = option_dict.get('open-options', None)
- if oo is not None:
- kwargs['open_options'] = [ k + '=' + str(v) for k, v in oo.items() ]
- return kwargs, drv
-
-# pylint: disable-next=invalid-name
-def getSRS(osr, srs_str : Optional[str]):
- """Return the decoded Spatial Reference System."""
-
- if srs_str is None:
- return None
-
- srs = osr.SpatialReference()
- if srs_str.startswith('EPSG:'):
- code = int(srs_str.removeprefix('EPSG:'))
- srs.ImportFromEPSG(code)
- else:
- # pylint: disable-next=broad-exception-raised
- raise Exception(f'Unknown SRS {srs_str}')
-
- logging.debug('Default SRS: "%s" (%s)', srs.ExportToProj4(), srs.GetName())
- return srs
-
-# pylint: disable-next=invalid-name
-def getExtent(extent : tuple[float, float, float, float] | list[float], srs = None):
- """Convert extent [minX, minY, maxX, maxY] into a polygon and assign the
- given SRS. Return a pair with the densified and non-densified extent.
- Like apps/ogr2ogr_lib.cpp, the former is obtained by segmentizing the
- polygon to make sure it is sufficiently densified when transforming to
- source layer SRS for spatial filtering."""
-
- if extent is None:
- return None, None
-
- if not isinstance(extent, (list, tuple)) or len(extent) != 4:
- # pylint: disable-next=broad-exception-raised
- raise Exception(f'Invalid extent {extent}')
- if srs is None:
- # pylint: disable-next=broad-exception-raised
- raise Exception('Configured extent but no SRS')
-
- logging.debug('Configured extent in %s: %s',
- srs.GetName(), ', '.join(map(str, extent)))
-
- from osgeo import ogr, osr # pylint: disable=import-outside-toplevel
- ogr.UseExceptions()
-
- ring = ogr.Geometry(ogr.wkbLinearRing)
- ring.AddPoint_2D(extent[0], extent[1])
- ring.AddPoint_2D(extent[2], extent[1])
- ring.AddPoint_2D(extent[2], extent[3])
- ring.AddPoint_2D(extent[0], extent[3])
- ring.AddPoint_2D(extent[0], extent[1])
-
- polygon = ogr.Geometry(ogr.wkbPolygon)
- polygon.AddGeometry(ring)
-
- # we expressed extent as minX, minY, maxX, maxY (easting/northing
- # ordered, i.e., in traditional GIS order)
- srs2 = srs.Clone()
- srs2.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER)
- polygon.AssignSpatialReference(srs2)
- if not srs2.IsSame(srs):
- polygon.TransformTo(srs)
-
- # densify the rectangle to avoid issues when reprojecting to the
- # source layer SRS, cf. apps/ogr2ogr_lib.cpp:ApplySpatialFilter()
- polygon_dense = polygon.Clone()
- segment_distance_metre = 10 * 1000
- if srs.IsGeographic():
- # pylint: disable-next=invalid-name
- dfMaxLength = segment_distance_metre / math.radians(srs.GetSemiMajor())
- polygon_dense.Segmentize(dfMaxLength)
- elif srs.IsProjected():
- # pylint: disable-next=invalid-name
- dfMaxLength = segment_distance_metre / srs.GetLinearUnits()
- polygon_dense.Segmentize(dfMaxLength)
-
- return polygon_dense, polygon
-
######
# The function definitions below are taken from cpython's source code