diff options
Diffstat (limited to 'common.py')
-rw-r--r-- | common.py | 359 |
1 files changed, 208 insertions, 151 deletions
@@ -1,6 +1,6 @@ #---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (common module) -# Copyright © 2024 Guilhem Moulin <info@guilhem.se> +# Copyright © 2024-2025 Guilhem Moulin <info@guilhem.se> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -16,146 +16,131 @@ # along with this program. If not, see <https://www.gnu.org/licenses/>. #---------------------------------------------------------------------- -import os, sys -from os import path +# pylint: disable=missing-module-docstring + +import os +from os import path as os_path, curdir as os_curdir, pardir as os_pardir, sep as os_sep +import sys from fnmatch import fnmatchcase from pathlib import Path, PosixPath -from urllib.parse import urlparse, urlunparse from stat import S_ISDIR -from math import modf -from xdg.BaseDirectory import xdg_config_home +import math import logging +from typing import Any, Iterator, Optional, Never, TextIO +from hashlib import sha256 + +from xdg.BaseDirectory import xdg_config_home import yaml -def init_logger(app=__file__, level=logging.WARNING): +def init_logger(app : str =__file__, level : int = logging.WARNING) -> logging.Logger: + """Initialize the logger""" + log_fmt = logging.Formatter('%(levelname)s: %(message)s') log = logging.getLogger() log.setLevel(level) - if os.getenv('SYSTEMD_EXEC_PID', None) is None or os.getenv('JOURNAL_STREAM', None) is None: + if (os.getenv('SYSTEMD_EXEC_PID', None) is None + or os.getenv('INVOCATION_ID', None) is None + or os.getenv('JOURNAL_STREAM', None) is None): ch = logging.StreamHandler() else: # started in systemd, use journald for filtering incl. coloring - from systemd.journal import JournalHandler + from systemd.journal import JournalHandler # pylint: disable=import-outside-toplevel ch = JournalHandler(SYSLOG_IDENTIFIER=app) ch.setFormatter(log_fmt) log.addHandler(ch) return log -def load_config(path=None, groupnames=None): - if path is None: - for p in [Path(), - Path(xdg_config_home).joinpath('webmap'), - PosixPath('/etc').joinpath('webmap')]: - p = str(p.joinpath('config.yml')) - if os.path.exists(p): - path = p - break - if path is None: - raise Exception('Could not find configuration file') - with open(path, 'r') as fp: - config = yaml.safe_load(fp) - layers = config.get('layers', {}) +class MissingConfiguration(Exception): + """Exception raised when no configuration file could be found""" + def __init__(self, name : str) -> Never: + super().__init__(f'Could not find configuration file {name}') - # validate sources - destinations = {} +class BadConfiguration(Exception): + """Exception raised when there is a bad configuration""" + def __init__(self, message : str, config_path : Optional[Path] = None) -> Never: + if config_path is not None: + message = str(config_path) + ': ' + message + super().__init__(message) + +def open_config(filename : str = 'config.yml', appname : str = 'webmap') -> TextIO: + """Open the configuration file""" + dirs = [ + Path(), + Path(xdg_config_home).joinpath(appname), + PosixPath('/etc').joinpath(appname) + ] + for d in dirs: + p = d.joinpath(filename) + try: + return p.open(mode='r', encoding='utf-8') + except (FileNotFoundError, PermissionError) as e: + logging.debug('Ignoring exception %s', str(e)) + raise MissingConfiguration(filename) + +def load_config(path : Optional[Path] = None) -> dict[str, Any]: + """Load configuration file""" + fp = open_config() if path is None else path.open(mode='r', encoding='utf-8') + try: + return yaml.safe_load(fp) + finally: + fp.close() + +def layers_in_group(groupname : str, patterns : str|list[str], + layernames : set[str]) -> Iterator[str]: + """Get layer names matching the given patterns""" + if isinstance(patterns, str): + patterns = [patterns] + for pat in patterns: + has_match = False + for layername in layernames: + if fnmatchcase(layername, pat): + yield layername + has_match = True + if has_match: + continue + if pat in layernames: + # fallback to exact match + yield pat + else: + logging.warning('Pattern "%s" in group "%s" does not match anything', pat, groupname) + +def parse_config(path : Optional[Path] = None, + groupnames : Optional[list[str]] = None) -> dict[str, Any]: + """Parse configuration file""" + config = load_config(path) + + layers = config.get('layers', {}) for name, layerdefs in layers.items(): if isinstance(layerdefs, dict) and 'sources' not in layerdefs: layers[name] = { 'sources': [layerdefs] } - for k in ['description', 'create']: + for k in ('description', 'create', 'publish'): if k in layerdefs: layers[name][k] = layerdefs.pop(k) layerdefs = layers[name] - if 'sources' not in layerdefs: - raise Exception(f'Layer "{name}" does not have any source receipe') - - for sourcedef in layerdefs.get('sources', []): - source = sourcedef.get('source', None) - if source is None: - continue - download = source.get('download', None) - if download is None: - url = None - dl_module = None - elif isinstance(download, str): - url = download - dl_module = None - source['download'] = download = { 'url': url } - else: - url = download.get('url', None) - dl_module = download.get('module', None) - if url is None: - urlp = None - else: - urlp = urlparse(url) - if urlp is None: - raise Exception(f'urlparse({url}) failed') - - cache = source.get('cache', None) - if cache is None or isinstance(cache, str): - source['cache'] = { 'path': cache } - else: - cache = cache.get('path', None) - - if cache is None or cache in ['', os.curdir, os.pardir] or cache.endswith(os.sep): - # infer filename from the source URL - if urlp is None or urlp.path is None or urlp.path == '' or urlp.path.endswith('/'): - raise Exception(f'Layer "{name}": Could not infer filename from URL {url}') - p = PosixPath(urlp.path) - if p is None or p.name is None or p.name == '': - raise Exception(f'Invalid PosixPath({urlp.path})') - if cache is None or cache == '': - cache = Path() - else: - cache = Path(cache) - cache = cache.joinpath(p.name) - else: - cache = Path(cache) - source['cache']['path'] = cache - - v = { 'url': urlp, 'module': dl_module } - if cache in destinations and destinations[cache] != v: - # allow destination conflicts, but only when the source URL and module match - raise Exception(f'Destination conflict for layer "{name}"') - destinations[cache] = v - # filter layers that are not of interest if groupnames is not None: - layernames = [] + layernames = set() + layernames_all = set(layers.keys()) layer_groups = config.get('layer-groups', {}) for groupname in groupnames: - if groupname not in layer_groups: - if groupname in layers: - # fallback to layer names - layernames.append(groupname) + if groupname in layer_groups: + for name in layers_in_group(groupname, layer_groups[groupname], layernames_all): + if name in layernames: + logging.debug('Layer "%s" was already added, skipping', name) + else: + layernames.add(name) + elif groupname in layers: + # fallback to layer names + if groupname in layernames: + logging.debug('Layer "%s" was already added, skipping', groupname) else: - logging.error('Unknown group/layer name "%s"', groupname) - exit(1) + layernames.add(groupname) else: - patterns = layer_groups[groupname] - if isinstance(patterns, str): - patterns = [patterns] - for pat in patterns: - has_match = False - for layername in layers: - if fnmatchcase(layername, pat): - if layername in layernames: - logging.debug('Layer "%s" was already added, skipping', layername) - else: - layernames.append(layername) - has_match = True - if has_match: - pass - elif pat in layers: - # fallback to exact match - if pat in layernames: - logging.debug('Layer "%s" was already added, skipping', pat) - else: - layernames.append(pat) - else: - logging.warning('Group name "%s" does not match anything', groupname) + logging.error('Unknown group/layer name "%s"', groupname) + sys.exit(1) layers = { name: layers[name] for name in layernames } @@ -166,40 +151,113 @@ def load_config(path=None, groupnames=None): if isinstance(extent, list): config['extent'] = tuple(extent) if config.get('SRS', None) is None: - raise Exception('Configured extent without SRS') + raise BadConfiguration('Configured extent without SRS') + + return config + +def _check_key_type(k : str, v : str, known_keys : list[type, tuple[set[str]]]) -> bool: + for t, ks in known_keys: + if k in ks and isinstance(v, t): + return True + return False + +def parse_config_dl(downloads) -> dict[str, dict[str, str|int]]: + """Parse and validate the "downloads" section from the configuration dictionary""" - sys.modules[__name__].config = config + if not isinstance(downloads, list): + raise BadConfiguration(f'Invalid download recipe: {downloads}') + + known_keys = [ + (str, {'path', 'url'}), + (int, {'max-age', 'max-size'}), + ] + + destinations = {} + known_keys_set = {k for _,ks in known_keys for k in ks} + for dl in downloads: + if 'url' in dl: + dls = [dl] + elif 'basedir' in dl and 'baseurl' in dl and 'files' in dl and 'path' not in dl: + dls = [] + for filename in dl['files']: + dl2 = { + 'path' : os_path.join(dl['basedir'], filename), + 'url' : dl['baseurl'] + filename + } + for k, v in dl.items(): + if k not in ('basedir', 'baseurl', 'files'): + dl2[k] = v + dls.append(dl2) + else: + raise BadConfiguration(f'Invalid download recipe: {dl}') + + for dl in dls: + path = dl.get('path', None) + if path is None or path in ('', os_curdir, os_pardir) or path.endswith(os_sep): + raise BadConfiguration(f'Invalid destination path "{path}"') + if path in destinations: + raise BadConfiguration(f'Duplicate download recipe for "{path}"') + dl2 = {} + for k, v in dl.items(): + if k == 'path': + continue + if k not in known_keys_set: + logging.warning('Ignoring unknown setting "%s" in download recipe for "%s"', + k, path) + elif not _check_key_type(k, v, known_keys): + logging.warning('Ignoring setting "%s" in download recipe for "%s"' + ' (invalid type)', k, path) + else: + dl2[k] = v + destinations[path] = dl2 -def format_bytes(n): - if n < 768: + return destinations + +# pylint: disable-next=invalid-name +def getSourcePathLockFileName(path : str) -> str: + """Return the name of the lockfile associated with a source path.""" + return sha256(path.encode('utf-8')).hexdigest() + '.lck' + +def format_bytes(n : int, threshold : int = 768, precision : int = 2) -> str: + """Format a number of bytes to a SI unit""" + + if n < threshold: return f'{n}\u202FB' - elif n < 768*1024: - return f'{n/1024:.2f}\u202FkiB' - elif n < 768*1024*1024: - return f'{n/1048576:.2f}\u202FMiB' - else: - return f'{n/1073741824:.2f}\u202FGiB' + if n < threshold * 1024: + return f'{n/1024:.{precision}f}\u202FkiB' + if n < threshold * 1048576: + return f'{n/1048576:.{precision}f}\u202FMiB' + return f'{n/1073741824:.{precision}f}\u202FGiB' + +def format_time(ts : float, precision : int = 3) -> str: + """Format a timestamp to HH:MM:SS.fff""" -def format_time(s): - fs, s = modf(s) - m, s = divmod(int(s), 60) + w = 2 if precision == 0 else precision + 3 + ts = round(ts, precision) + m = math.floor(ts/60.) + s = ts - 60. * m h, m = divmod(m, 60) - return f'{h:02d}:{m:02d}:{s + fs:06.3f}' + return f'{h:02d}:{m:02d}:{s:0{w}.{precision}f}' -# Return a boolean indicating whether the installer GDAL version is -# greater than or equal to the provider (maj, min, rev) triplet. -def gdal_version_min(maj=0, min=0, rev=0): - if maj < 1 or (maj == 1 and min < 10): - # GDAL_VERSION_NUM() macro was changed in 1.10. That version - # was released in 2013 so we blindly assume the installer - # version is more recent - return True +def escape_identifier(identifier : str) -> str: + """Escape the given identifier, cf. + swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id().""" - from osgeo import gdal - version_cur = int(gdal.VersionInfo()); - # cf. GDAL_COMPUTE_VERSION(maj,min,rev) in gcore/gdal_version.h.in - version_min = maj*1000000 + min*10000 + rev*100 - return version_min <= version_cur + if identifier is None or '\x00' in identifier: + raise RuntimeError(f'Invalid identifier "{identifier}"') + + # SQL:1999 delimited identifier + return '"' + identifier.replace('"', '""') + '"' + +def escape_literal_str(literal : str) -> str: + """Escape the given character string literal, cf. + swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_literal().""" + + if literal is None or '\x00' in literal: + raise RuntimeError(f'Invalid literal "{literal}"') + + # SQL:1999 character string literal + return '\'' + literal.replace('\'', '\'\'') + '\'' ###### @@ -207,43 +265,42 @@ def gdal_version_min(maj=0, min=0, rev=0): # and augmented with dir_fd. # An alternative would be to use str(Path(f'/proc/self/fd/{dir_fd}').joinpath(path)). -# Is a path a directory? -# (From genericpath.py.) -def isdir(path, dir_fd=None, follow_symlinks=True): +def isdir(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool: + """Is a path a directory? (From genericpath.py.)""" try: st = os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return S_ISDIR(st.st_mode) -# Does a path exist? -# (From genericpath.py.) -def exists(path, dir_fd=None, follow_symlinks=True): +def exists(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool: + """Does a path exist? (From genericpath.py.)""" try: os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return True -# Create a leaf directory and all intermediate ones. -# (From os.py.) -def makedirs(name, mode=0o777, exist_ok=False, dir_fd=None, logging=None): - head, tail = path.split(name) +def makedirs(name : str, mode : int = 0o777, + exist_ok : bool = False, + dir_fd : Optional[int] = None) -> None: + """Create a leaf directory and all intermediate ones. (From os.py.)""" + + head, tail = os_path.split(name) if not tail: - head, tail = path.split(head) + head, tail = os_path.split(head) if head and tail and not exists(head, dir_fd=dir_fd): try: - makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd, logging=logging) + makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd) except FileExistsError: # Defeats race condition when another thread created the path pass - cdir = os.curdir + cdir = os_curdir if isinstance(tail, bytes): - cdir = bytes(os.curdir, 'ASCII') + cdir = bytes(os_curdir, 'ASCII') if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists return - if logging is not None: - logging.debug('mkdir("%s", 0%o)', name, mode) + logging.debug('mkdir("%s", 0%o)', name, mode) try: os.mkdir(name, mode, dir_fd=dir_fd) except OSError: |