#---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (common module) # Copyright © 2024-2025 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- # pylint: disable=missing-module-docstring import os from os import path as os_path, curdir as os_curdir import sys from fnmatch import fnmatchcase from pathlib import Path, PosixPath from stat import S_ISDIR import math import logging from typing import Any, Iterator, Optional, Never from hashlib import sha256 from xdg.BaseDirectory import xdg_config_home import yaml def init_logger(app : str =__file__, level : int = logging.WARNING) -> logging.Logger: """Initialize the logger""" log_fmt = logging.Formatter('%(levelname)s: %(message)s') log = logging.getLogger() log.setLevel(level) if (os.getenv('SYSTEMD_EXEC_PID', None) is None or os.getenv('INVOCATION_ID', None) is None or os.getenv('JOURNAL_STREAM', None) is None): ch = logging.StreamHandler() else: # started in systemd, use journald for filtering incl. coloring from systemd.journal import JournalHandler # pylint: disable=import-outside-toplevel ch = JournalHandler(SYSLOG_IDENTIFIER=app) ch.setFormatter(log_fmt) log.addHandler(ch) return log class MissingConfiguration(Exception): """Exception raised when no configuration file could be found""" def __init__(self, name : str) -> Never: super().__init__(f'Could not find configuration file {name}') class BadConfiguration(Exception): """Exception raised when there is a bad configuration""" def __init__(self, message : str, config_path : Optional[Path] = None) -> Never: if config_path is not None: message = str(config_path) + ': ' + message super().__init__(message) def find_config(filename : str = 'config.yml', appname : str = 'webmap') -> Path: """Return the configuration file path""" dirs = [ Path(), Path(xdg_config_home).joinpath(appname), PosixPath('/etc').joinpath(appname) ] for d in dirs: p = d.joinpath(filename) if p.exists(): return p raise MissingConfiguration(filename) def load_config(path : Optional[Path] = None) -> dict[str, Any]: """Load configuration file""" config_path = find_config() if path is None else path with config_path.open(mode='r', encoding='utf-8') as fp: return yaml.safe_load(fp) def layers_in_group(groupname : str, patterns : str|list[str], layernames : set[str]) -> Iterator[str]: """Get layer names matching the given patterns""" if isinstance(patterns, str): patterns = [patterns] for pat in patterns: has_match = False for layername in layernames: if fnmatchcase(layername, pat): yield layername has_match = True if has_match: continue if pat in layernames: # fallback to exact match yield pat else: logging.warning('Pattern "%s" in group "%s" does not match anything', pat, groupname) def parse_config(path : Optional[Path] = None, groupnames : Optional[list[str]] = None) -> dict[str, Any]: """Parse configuration file""" config = load_config(path) layers = config.get('layers', {}) for name, layerdefs in layers.items(): if isinstance(layerdefs, dict) and 'sources' not in layerdefs: layers[name] = { 'sources': [layerdefs] } for k in ('description', 'create', 'publish'): if k in layerdefs: layers[name][k] = layerdefs.pop(k) layerdefs = layers[name] # filter layers that are not of interest if groupnames is not None: layernames = set() layernames_all = set(layers.keys()) layer_groups = config.get('layer-groups', {}) for groupname in groupnames: if groupname in layer_groups: for name in layers_in_group(groupname, layer_groups[groupname], layernames_all): if name in layernames: logging.debug('Layer "%s" was already added, skipping', name) else: layernames.add(name) elif groupname in layers: # fallback to layer names if groupname in layernames: logging.debug('Layer "%s" was already added, skipping', groupname) else: layernames.add(groupname) else: logging.error('Unknown group/layer name "%s"', groupname) sys.exit(1) layers = { name: layers[name] for name in layernames } config['layers'] = layers extent = config.get('extent', None) if extent is not None: if isinstance(extent, list): config['extent'] = tuple(extent) if config.get('SRS', None) is None: raise BadConfiguration('Configured extent without SRS') return config # pylint: disable-next=invalid-name def getSourcePathLockFileName(path : str) -> str: """Return the name of the lockfile associated with a source path.""" return sha256(path.encode('utf-8')).hexdigest() + '.lck' def format_bytes(n : int, threshold : int = 768, precision : int = 2) -> str: """Format a number of bytes to a SI unit""" if n < threshold: return f'{n}\u202FB' if n < threshold * 1024: return f'{n/1024:.{precision}f}\u202FkiB' if n < threshold * 1048576: return f'{n/1048576:.{precision}f}\u202FMiB' return f'{n/1073741824:.{precision}f}\u202FGiB' def format_time(ts : float, precision : int = 3) -> str: """Format a timestamp to HH:MM:SS.fff""" w = 2 if precision == 0 else precision + 3 ts = round(ts, precision) m = math.floor(ts/60.) s = ts - 60. * m h, m = divmod(m, 60) return f'{h:02d}:{m:02d}:{s:0{w}.{precision}f}' def escape_identifier(identifier : str) -> str: """Escape the given identifier, cf. swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id().""" if identifier is None or '\x00' in identifier: raise RuntimeError(f'Invalid identifier "{identifier}"') # SQL:1999 delimited identifier return '"' + identifier.replace('"', '""') + '"' def escape_literal_str(literal : str) -> str: """Escape the given character string literal, cf. swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_literal().""" if literal is None or '\x00' in literal: raise RuntimeError(f'Invalid literal "{literal}"') # SQL:1999 character string literal return '\'' + literal.replace('\'', '\'\'') + '\'' ###### # The function definitions below are taken from cpython's source code # and augmented with dir_fd. # An alternative would be to use str(Path(f'/proc/self/fd/{dir_fd}').joinpath(path)). def isdir(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool: """Is a path a directory? (From genericpath.py.)""" try: st = os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return S_ISDIR(st.st_mode) def exists(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool: """Does a path exist? (From genericpath.py.)""" try: os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return True def makedirs(name : str, mode : int = 0o777, exist_ok : bool = False, dir_fd : Optional[int] = None) -> None: """Create a leaf directory and all intermediate ones. (From os.py.)""" head, tail = os_path.split(name) if not tail: head, tail = os_path.split(head) if head and tail and not exists(head, dir_fd=dir_fd): try: makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd) except FileExistsError: # Defeats race condition when another thread created the path pass cdir = os_curdir if isinstance(tail, bytes): cdir = bytes(os_curdir, 'ASCII') if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists return logging.debug('mkdir("%s", 0%o)', name, mode) try: os.mkdir(name, mode, dir_fd=dir_fd) except OSError: # Cannot rely on checking for EEXIST, since the operating system # could give priority to other errors like EACCES or EROFS if not exist_ok or not isdir(name, dir_fd=dir_fd): raise