#---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (common module) # Copyright © 2024-2025 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- # pylint: disable=missing-module-docstring import os from os import path as os_path, curdir as os_curdir import sys from fnmatch import fnmatchcase from pathlib import Path, PosixPath from stat import S_ISDIR import math import logging from typing import Any, Optional, Never from xdg.BaseDirectory import xdg_config_home import yaml def init_logger(app : str =__file__, level : int = logging.WARNING) -> logging.Logger: """Initialize the logger""" log_fmt = logging.Formatter('%(levelname)s: %(message)s') log = logging.getLogger() log.setLevel(level) pid = os.getenv('SYSTEMD_EXEC_PID', None) if (pid is None or int(pid) != os.getpid() or os.getenv('JOURNAL_STREAM', None) is None): ch = logging.StreamHandler() else: # started in systemd, use journald for filtering incl. coloring from systemd.journal import JournalHandler # pylint: disable=import-outside-toplevel ch = JournalHandler(SYSLOG_IDENTIFIER=app) ch.setFormatter(log_fmt) log.addHandler(ch) return log class MissingConfiguration(Exception): """Exception raised when no configuration file could be found""" def __init__(self, name : str) -> Never: super().__init__(f'Could not find configuration file {name}') def find_config(filename : str = 'config.yml', appname : str = 'webmap') -> Path: """Return the configuration file path""" dirs = [ Path(), Path(xdg_config_home).joinpath(appname), PosixPath('/etc').joinpath(appname) ] for d in dirs: p = d.joinpath(filename) if p.exists(): return p raise MissingConfiguration(filename) def parse_config(path : Optional[Path] = None, groupnames : Optional[list[str]] = None) -> dict[str, Any]: """Parse configuration file""" config_path = find_config() if path is None else path with config_path.open(mode='r', encoding='utf-8') as fp: config = yaml.safe_load(fp) # filter layers that are not of interest layers = config.get('layers', {}) if groupnames is not None: layernames = [] layer_groups = config.get('layer-groups', {}) for groupname in groupnames: if groupname not in layer_groups: if groupname in layers: # fallback to layer names layernames.append(groupname) else: logging.error('Unknown group/layer name "%s"', groupname) sys.exit(1) else: patterns = layer_groups[groupname] if isinstance(patterns, str): patterns = [patterns] for pat in patterns: has_match = False for layername in layers: if fnmatchcase(layername, pat): if layername in layernames: logging.debug('Layer "%s" was already added, skipping', layername) else: layernames.append(layername) has_match = True if has_match: pass elif pat in layers: # fallback to exact match if pat in layernames: logging.debug('Layer "%s" was already added, skipping', pat) else: layernames.append(pat) else: logging.warning('Group name "%s" does not match anything', groupname) layers = { name: layers[name] for name in layernames } config['layers'] = layers extent = config.get('extent', None) if extent is not None: if isinstance(extent, list): config['extent'] = tuple(extent) if config.get('SRS', None) is None: # pylint: disable-next=broad-exception-raised raise Exception('Configured extent without SRS') return config def format_bytes(n : int, threshold : int = 768, precision : int = 2) -> str: """Format a number of bytes to a SI unit""" if n < threshold: return f'{n}\u202FB' if n < threshold * 1024: return f'{n/1024:.{precision}f}\u202FkiB' if n < threshold * 1048576: return f'{n/1048576:.{precision}f}\u202FMiB' return f'{n/1073741824:.{precision}f}\u202FGiB' def format_time(ts : float, precision : int = 3) -> str: """Format a timestamp to HH:MM:SS.fff""" w = 2 if precision == 0 else precision + 3 ts = round(ts, precision) m = math.floor(ts/60.) s = ts - 60. * m h, m = divmod(m, 60) return f'{h:02d}:{m:02d}:{s:0{w}.{precision}f}' # pylint: disable-next=invalid-name, redefined-builtin def gdalVersionMin(gdal, maj : int = 0, min : int = 0, rev : int = 0) -> bool: """Return a boolean indicating whether the installer GDAL version is greater than or equal to the provider (maj, min, rev) triplet.""" if maj < 1 or (maj == 1 and min < 10): # GDAL_VERSION_NUM() macro was changed in 1.10. That version # was released in 2013 so we blindly assume the installer # version is more recent return True version_cur = int(gdal.VersionInfo()) # cf. GDAL_COMPUTE_VERSION(maj,min,rev) in gcore/gdal_version.h.in version_min = maj*1000000 + min*10000 + rev*100 return version_min <= version_cur # pylint: disable-next=invalid-name def gdalGetMetadataItem(obj, k : str) -> bool: """Wrapper around gdal.MajorObject.GetMetadataItem(name).""" v = obj.GetMetadataItem(k) if v is not None and isinstance(v, str): return v.upper() == 'YES' return False def escape_identifier(identifier : str) -> str: """Escape the given identifier, cf. swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id().""" if '\x00' in identifier: # pylint: disable-next=broad-exception-raised raise Exception(f'Invalid identifier "{identifier}"') # SQL:1999 delimited identifier return '"' + identifier.replace('"', '""') + '"' # pylint: disable-next=invalid-name,dangerous-default-value def gdalSetOpenExArgs(gdal, option_dict : Optional[dict[str, Any]] = {}, flags : int = 0): """Return a pair kwargs and driver to use with gdal.OpenEx().""" kwargs = { 'nOpenFlags': gdal.OF_VECTOR | flags } fmt = option_dict.get('format', None) if fmt is None: drv = None else: drv = gdal.GetDriverByName(fmt) if drv is None: # pylint: disable-next=broad-exception-raised raise Exception(f'Unknown driver name "{fmt}"') if not gdalGetMetadataItem(drv, gdal.DCAP_VECTOR): # pylint: disable-next=broad-exception-raised raise Exception(f'Driver "{drv.ShortName}" has no vector capabilities') kwargs['allowed_drivers'] = [ drv.ShortName ] oo = option_dict.get('open-options', None) if oo is not None: kwargs['open_options'] = [ k + '=' + str(v) for k, v in oo.items() ] return kwargs, drv # pylint: disable-next=invalid-name def getSRS(osr, srs_str : Optional[str]): """Return the decoded Spatial Reference System.""" if srs_str is None: return None srs = osr.SpatialReference() if srs_str.startswith('EPSG:'): code = int(srs_str.removeprefix('EPSG:')) srs.ImportFromEPSG(code) else: # pylint: disable-next=broad-exception-raised raise Exception(f'Unknown SRS {srs_str}') logging.debug('Default SRS: "%s" (%s)', srs.ExportToProj4(), srs.GetName()) return srs # pylint: disable-next=invalid-name def getExtent(extent : tuple[float, float, float, float] | list[float], srs = None): """Convert extent [minX, minY, maxX, maxY] into a polygon and assign the given SRS. Return a pair with the densified and non-densified extent. Like apps/ogr2ogr_lib.cpp, the former is obtained by segmentizing the polygon to make sure it is sufficiently densified when transforming to source layer SRS for spatial filtering.""" if extent is None: return None, None if not isinstance(extent, (list, tuple)) or len(extent) != 4: # pylint: disable-next=broad-exception-raised raise Exception(f'Invalid extent {extent}') if srs is None: # pylint: disable-next=broad-exception-raised raise Exception('Configured extent but no SRS') logging.debug('Configured extent in %s: %s', srs.GetName(), ', '.join(map(str, extent))) from osgeo import ogr, osr # pylint: disable=import-outside-toplevel ogr.UseExceptions() ring = ogr.Geometry(ogr.wkbLinearRing) ring.AddPoint_2D(extent[0], extent[1]) ring.AddPoint_2D(extent[2], extent[1]) ring.AddPoint_2D(extent[2], extent[3]) ring.AddPoint_2D(extent[0], extent[3]) ring.AddPoint_2D(extent[0], extent[1]) polygon = ogr.Geometry(ogr.wkbPolygon) polygon.AddGeometry(ring) # we expressed extent as minX, minY, maxX, maxY (easting/northing # ordered, i.e., in traditional GIS order) srs2 = srs.Clone() srs2.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER) polygon.AssignSpatialReference(srs2) if not srs2.IsSame(srs): polygon.TransformTo(srs) # densify the rectangle to avoid issues when reprojecting to the # source layer SRS, cf. apps/ogr2ogr_lib.cpp:ApplySpatialFilter() polygon_dense = polygon.Clone() segment_distance_metre = 10 * 1000 if srs.IsGeographic(): # pylint: disable-next=invalid-name dfMaxLength = segment_distance_metre / math.radians(srs.GetSemiMajor()) polygon_dense.Segmentize(dfMaxLength) elif srs.IsProjected(): # pylint: disable-next=invalid-name dfMaxLength = segment_distance_metre / srs.GetLinearUnits() polygon_dense.Segmentize(dfMaxLength) return polygon_dense, polygon ###### # The function definitions below are taken from cpython's source code # and augmented with dir_fd. # An alternative would be to use str(Path(f'/proc/self/fd/{dir_fd}').joinpath(path)). def isdir(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool: """Is a path a directory? (From genericpath.py.)""" try: st = os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return S_ISDIR(st.st_mode) def exists(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool: """Does a path exist? (From genericpath.py.)""" try: os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return True def makedirs(name : str, mode : int = 0o777, exist_ok : bool = False, dir_fd : Optional[int] = None) -> None: """Create a leaf directory and all intermediate ones. (From os.py.)""" head, tail = os_path.split(name) if not tail: head, tail = os_path.split(head) if head and tail and not exists(head, dir_fd=dir_fd): try: makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd) except FileExistsError: # Defeats race condition when another thread created the path pass cdir = os_curdir if isinstance(tail, bytes): cdir = bytes(os_curdir, 'ASCII') if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists return logging.debug('mkdir("%s", 0%o)', name, mode) try: os.mkdir(name, mode, dir_fd=dir_fd) except OSError: # Cannot rely on checking for EEXIST, since the operating system # could give priority to other errors like EACCES or EROFS if not exist_ok or not isdir(name, dir_fd=dir_fd): raise