#---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (common module) # Copyright © 2024 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- import os, sys from os import path from fnmatch import fnmatchcase from pathlib import Path, PosixPath from urllib.parse import urlparse, urlunparse from stat import S_ISDIR from math import modf from xdg.BaseDirectory import xdg_config_home import logging import yaml def init_logger(app=__file__, level=logging.WARNING): log_fmt = logging.Formatter('%(levelname)s: %(message)s') log = logging.getLogger() log.setLevel(level) pid = os.getenv('SYSTEMD_EXEC_PID', None) if (pid is None or int(pid) != os.getpid() or os.getenv('JOURNAL_STREAM', None) is None): ch = logging.StreamHandler() else: # started in systemd, use journald for filtering incl. coloring from systemd.journal import JournalHandler ch = JournalHandler(SYSLOG_IDENTIFIER=app) ch.setFormatter(log_fmt) log.addHandler(ch) return log def load_config(path=None, groupnames=None): if path is None: for p in [Path(), Path(xdg_config_home).joinpath('webmap'), PosixPath('/etc').joinpath('webmap')]: p = str(p.joinpath('config.yml')) if os.path.exists(p): path = p break if path is None: raise Exception('Could not find configuration file') with open(path, 'r') as fp: config = yaml.safe_load(fp) layers = config.get('layers', {}) # validate sources destinations = {} for name, layerdefs in layers.items(): if isinstance(layerdefs, dict) and 'sources' not in layerdefs: layers[name] = { 'sources': [layerdefs] } for k in ['description', 'create', 'publish']: if k in layerdefs: layers[name][k] = layerdefs.pop(k) layerdefs = layers[name] if 'sources' not in layerdefs: raise Exception(f'Layer "{name}" does not have any source receipe') for sourcedef in layerdefs.get('sources', []): source = sourcedef.get('source', None) if source is None: continue download = source.get('download', None) if download is None: url = None dl_module = None elif isinstance(download, str): url = download dl_module = None source['download'] = download = { 'url': url } else: url = download.get('url', None) dl_module = download.get('module', None) if url is None: urlp = None else: urlp = urlparse(url) if urlp is None: raise Exception(f'urlparse({url}) failed') cache = source.get('cache', None) if cache is None or isinstance(cache, str): source['cache'] = { 'path': cache } else: cache = cache.get('path', None) if cache is None or cache in ['', os.curdir, os.pardir] or cache.endswith(os.sep): # infer filename from the source URL if urlp is None or urlp.path is None or urlp.path == '' or urlp.path.endswith('/'): raise Exception(f'Layer "{name}": Could not infer filename from URL {url}') p = PosixPath(urlp.path) if p is None or p.name is None or p.name == '': raise Exception(f'Invalid PosixPath({urlp.path})') if cache is None or cache == '': cache = Path() else: cache = Path(cache) cache = cache.joinpath(p.name) else: cache = Path(cache) source['cache']['path'] = cache v = { 'url': urlp, 'module': dl_module } if cache in destinations and destinations[cache] != v: # allow destination conflicts, but only when the source URL and module match raise Exception(f'Destination conflict for layer "{name}"') destinations[cache] = v # filter layers that are not of interest if groupnames is not None: layernames = [] layer_groups = config.get('layer-groups', {}) for groupname in groupnames: if groupname not in layer_groups: if groupname in layers: # fallback to layer names layernames.append(groupname) else: logging.error('Unknown group/layer name "%s"', groupname) exit(1) else: patterns = layer_groups[groupname] if isinstance(patterns, str): patterns = [patterns] for pat in patterns: has_match = False for layername in layers: if fnmatchcase(layername, pat): if layername in layernames: logging.debug('Layer "%s" was already added, skipping', layername) else: layernames.append(layername) has_match = True if has_match: pass elif pat in layers: # fallback to exact match if pat in layernames: logging.debug('Layer "%s" was already added, skipping', pat) else: layernames.append(pat) else: logging.warning('Group name "%s" does not match anything', groupname) layers = { name: layers[name] for name in layernames } config['layers'] = layers extent = config.get('extent', None) if extent is not None: if isinstance(extent, list): config['extent'] = tuple(extent) if config.get('SRS', None) is None: raise Exception('Configured extent without SRS') sys.modules[__name__].config = config def format_bytes(n): if n < 768: return f'{n}\u202FB' elif n < 768*1024: return f'{n/1024:.2f}\u202FkiB' elif n < 768*1024*1024: return f'{n/1048576:.2f}\u202FMiB' else: return f'{n/1073741824:.2f}\u202FGiB' def format_time(s): fs, s = modf(s) m, s = divmod(int(s), 60) h, m = divmod(m, 60) return f'{h:02d}:{m:02d}:{s + fs:06.3f}' # Return a boolean indicating whether the installer GDAL version is # greater than or equal to the provider (maj, min, rev) triplet. def gdalVersionMin(maj=0, min=0, rev=0): if maj < 1 or (maj == 1 and min < 10): # GDAL_VERSION_NUM() macro was changed in 1.10. That version # was released in 2013 so we blindly assume the installer # version is more recent return True from osgeo import gdal gdal.UseExceptions() version_cur = int(gdal.VersionInfo()); # cf. GDAL_COMPUTE_VERSION(maj,min,rev) in gcore/gdal_version.h.in version_min = maj*1000000 + min*10000 + rev*100 return version_min <= version_cur # Wrapper around gdal.MajorObject.GetMetadataItem(name) def gdalGetMetadataItem(o, k): v = o.GetMetadataItem(k) if v is not None and isinstance(v, str): return v.upper() == 'YES' else: return False # Escape the given identifier, cf. # swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id() def escapeIdentifier(identifier): if '\x00' in identifier: raise Exception(f'Invalid identifier "{identifier}"') # SQL:1999 delimited identifier return '"' + identifier.replace('"', '""') + '"' # Return a pair kwargs and driver to use with gdal.OpenEx() def gdalSetOpenExArgs(option_dict, flags=0): from osgeo import gdal gdal.UseExceptions() kwargs = { 'nOpenFlags': gdal.OF_VECTOR | flags } fmt = option_dict.get('format', None) if fmt is None: drv = None else: drv = gdal.GetDriverByName(fmt) if drv is None: raise Exception(f'Unknown driver name "{fmt}"') elif not gdalGetMetadataItem(drv, gdal.DCAP_VECTOR): raise Exception(f'Driver "{drv.ShortName}" has no vector capabilities') kwargs['allowed_drivers'] = [ drv.ShortName ] oo = option_dict.get('open-options', None) if oo is not None: kwargs['open_options'] = [ k + '=' + str(v) for k, v in oo.items() ] return kwargs, drv # Return the decoded Spatial Reference System def getSRS(srs_str): if srs_str is None: return from osgeo import osr osr.UseExceptions() srs = osr.SpatialReference() if srs_str.startswith('EPSG:'): code = int(srs_str.removeprefix('EPSG:')) srs.ImportFromEPSG(code) else: raise Exception(f'Unknown SRS {srs_str}') logging.debug('Default SRS: "%s" (%s)', srs.ExportToProj4(), srs.GetName()) return srs # Convert extent [minX, minY, maxX, maxY] into a polygon and assign the # given SRS. Return a pair with the densified and non-densified extent. # Like apps/ogr2ogr_lib.cpp, the former is obtained by segmentizing the # polygon to make sure it is sufficiently densified when transforming to # source layer SRS for spatial filtering. def getExtent(extent, srs=None): if extent is None: return None, None if not (isinstance(extent, list) or isinstance(extent, tuple)) or len(extent) != 4: raise Exception(f'Invalid extent {extent}') elif srs is None: raise Exception('Configured extent but no SRS') logging.debug('Configured extent in %s: %s', srs.GetName(), ', '.join(map(str, extent))) from osgeo import ogr, osr ogr.UseExceptions() ring = ogr.Geometry(ogr.wkbLinearRing) ring.AddPoint_2D(extent[0], extent[1]) ring.AddPoint_2D(extent[2], extent[1]) ring.AddPoint_2D(extent[2], extent[3]) ring.AddPoint_2D(extent[0], extent[3]) ring.AddPoint_2D(extent[0], extent[1]) polygon = ogr.Geometry(ogr.wkbPolygon) polygon.AddGeometry(ring) # we expressed extent as minX, minY, maxX, maxY (easting/northing # ordered, i.e., in traditional GIS order) srs2 = srs.Clone() srs2.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER) polygon.AssignSpatialReference(srs2) if not srs2.IsSame(srs): polygon.TransformTo(srs) # densify the rectangle to avoid issues when reprojecting to the # source layer SRS, cf. apps/ogr2ogr_lib.cpp:ApplySpatialFilter() polygon_dense = polygon.Clone() segment_distance_metre = 10 * 1000 if srs.IsGeographic(): dfMaxLength = segment_distance_metre / math.radians(srs.GetSemiMajor()) polygon_dense.Segmentize(dfMaxLength) elif srs.IsProjected(): dfMaxLength = segment_distance_metre / srs.GetLinearUnits() polygon_dense.Segmentize(dfMaxLength) return polygon_dense, polygon ###### # The function definitions below are taken from cpython's source code # and augmented with dir_fd. # An alternative would be to use str(Path(f'/proc/self/fd/{dir_fd}').joinpath(path)). # Is a path a directory? # (From genericpath.py.) def isdir(path, dir_fd=None, follow_symlinks=True): try: st = os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return S_ISDIR(st.st_mode) # Does a path exist? # (From genericpath.py.) def exists(path, dir_fd=None, follow_symlinks=True): try: os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return True # Create a leaf directory and all intermediate ones. # (From os.py.) def makedirs(name, mode=0o777, exist_ok=False, dir_fd=None, logging=None): head, tail = path.split(name) if not tail: head, tail = path.split(head) if head and tail and not exists(head, dir_fd=dir_fd): try: makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd, logging=logging) except FileExistsError: # Defeats race condition when another thread created the path pass cdir = os.curdir if isinstance(tail, bytes): cdir = bytes(os.curdir, 'ASCII') if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists return if logging is not None: logging.debug('mkdir("%s", 0%o)', name, mode) try: os.mkdir(name, mode, dir_fd=dir_fd) except OSError: # Cannot rely on checking for EEXIST, since the operating system # could give priority to other errors like EACCES or EROFS if not exist_ok or not isdir(name, dir_fd=dir_fd): raise