#---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (common module) # Copyright © 2024 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- import os, sys from os import path from fnmatch import fnmatchcase from pathlib import Path, PosixPath from urllib.parse import urlparse, urlunparse from stat import S_ISDIR from math import modf from xdg.BaseDirectory import xdg_config_home import logging import yaml def init_logger(app=__file__, level=logging.WARNING): log_fmt = logging.Formatter('%(levelname)s: %(message)s') log = logging.getLogger() log.setLevel(level) if os.getenv('SYSTEMD_EXEC_PID', None) is None or os.getenv('JOURNAL_STREAM', None) is None: ch = logging.StreamHandler() else: # started in systemd, use journald for filtering incl. coloring from systemd.journal import JournalHandler ch = JournalHandler(SYSLOG_IDENTIFIER=app) ch.setFormatter(log_fmt) log.addHandler(ch) return log def load_config(path=None, groupnames=None): if path is None: for p in [Path(), Path(xdg_config_home).joinpath('webmap'), PosixPath('/etc').joinpath('webmap')]: p = str(p.joinpath('config.yml')) if os.path.exists(p): path = p break if path is None: raise Exception('Could not find configuration file') with open(path, 'r') as fp: config = yaml.safe_load(fp) layers = config.get('layers', {}) # validate sources destinations = {} for name, layerdefs in layers.items(): if isinstance(layerdefs, dict) and 'sources' not in layerdefs: layers[name] = { 'sources': [layerdefs] } for k in ['description', 'create']: if k in layerdefs: layers[name][k] = layerdefs.pop(k) layerdefs = layers[name] if 'sources' not in layerdefs: raise Exception(f'Layer "{name}" does not have any source receipe') for sourcedef in layerdefs.get('sources', []): source = sourcedef.get('source', None) if source is None: continue download = source.get('download', None) if download is None: url = None dl_module = None elif isinstance(download, str): url = download dl_module = None source['download'] = download = { 'url': url } else: url = download.get('url', None) dl_module = download.get('module', None) if url is None: urlp = None else: urlp = urlparse(url) if urlp is None: raise Exception(f'urlparse({url}) failed') cache = source.get('cache', None) if cache is None or isinstance(cache, str): source['cache'] = { 'path': cache } else: cache = cache.get('path', None) if cache is None or cache in ['', os.curdir, os.pardir] or cache.endswith(os.sep): # infer filename from the source URL if urlp is None or urlp.path is None or urlp.path == '' or urlp.path.endswith('/'): raise Exception(f'Layer "{name}": Could not infer filename from URL {url}') p = PosixPath(urlp.path) if p is None or p.name is None or p.name == '': raise Exception(f'Invalid PosixPath({urlp.path})') if cache is None or cache == '': cache = Path() else: cache = Path(cache) cache = cache.joinpath(p.name) else: cache = Path(cache) source['cache']['path'] = cache v = { 'url': urlp, 'module': dl_module } if cache in destinations and destinations[cache] != v: # allow destination conflicts, but only when the source URL and module match raise Exception(f'Destination conflict for layer "{name}"') destinations[cache] = v # filter layers that are not of interest if groupnames is not None: layernames = [] layer_groups = config.get('layer-groups', {}) for groupname in groupnames: if groupname not in layer_groups: if groupname in layers: # fallback to layer names layernames.append(groupname) else: logging.error('Unknown group/layer name "%s"', groupname) exit(1) else: patterns = layer_groups[groupname] if isinstance(patterns, str): patterns = [patterns] for pat in patterns: has_match = False for layername in layers: if fnmatchcase(layername, pat): if layername in layernames: logging.debug('Layer "%s" was already added, skipping', layername) else: layernames.append(layername) has_match = True if has_match: pass elif pat in layers: # fallback to exact match if pat in layernames: logging.debug('Layer "%s" was already added, skipping', pat) else: layernames.append(pat) else: logging.warning('Group name "%s" does not match anything', groupname) layers = { name: layers[name] for name in layernames } config['layers'] = layers extent = config.get('extent', None) if extent is not None: if isinstance(extent, list): config['extent'] = tuple(extent) if config.get('SRS', None) is None: raise Exception('Configured extent without SRS') sys.modules[__name__].config = config def format_bytes(n): if n < 768: return f'{n}\u202FB' elif n < 768*1024: return f'{n/1024:.2f}\u202FkiB' elif n < 768*1024*1024: return f'{n/1048576:.2f}\u202FMiB' else: return f'{n/1073741824:.2f}\u202FGiB' def format_time(s): fs, s = modf(s) m, s = divmod(int(s), 60) h, m = divmod(m, 60) return f'{h:02d}:{m:02d}:{s + fs:06.3f}' # Return a boolean indicating whether the installer GDAL version is # greater than or equal to the provider (maj, min, rev) triplet. def gdal_version_min(maj=0, min=0, rev=0): if maj < 1 or (maj == 1 and min < 10): # GDAL_VERSION_NUM() macro was changed in 1.10. That version # was released in 2013 so we blindly assume the installer # version is more recent return True from osgeo import gdal version_cur = int(gdal.VersionInfo()); # cf. GDAL_COMPUTE_VERSION(maj,min,rev) in gcore/gdal_version.h.in version_min = maj*1000000 + min*10000 + rev*100 return version_min <= version_cur ###### # The function definitions below are taken from cpython's source code # and augmented with dir_fd. # An alternative would be to use str(Path(f'/proc/self/fd/{dir_fd}').joinpath(path)). # Is a path a directory? # (From genericpath.py.) def isdir(path, dir_fd=None, follow_symlinks=True): try: st = os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return S_ISDIR(st.st_mode) # Does a path exist? # (From genericpath.py.) def exists(path, dir_fd=None, follow_symlinks=True): try: os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return True # Create a leaf directory and all intermediate ones. # (From os.py.) def makedirs(name, mode=0o777, exist_ok=False, dir_fd=None, logging=None): head, tail = path.split(name) if not tail: head, tail = path.split(head) if head and tail and not exists(head, dir_fd=dir_fd): try: makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd, logging=logging) except FileExistsError: # Defeats race condition when another thread created the path pass cdir = os.curdir if isinstance(tail, bytes): cdir = bytes(os.curdir, 'ASCII') if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists return if logging is not None: logging.debug('mkdir("%s", 0%o)', name, mode) try: os.mkdir(name, mode, dir_fd=dir_fd) except OSError: # Cannot rely on checking for EEXIST, since the operating system # could give priority to other errors like EACCES or EROFS if not exist_ok or not isdir(name, dir_fd=dir_fd): raise