aboutsummaryrefslogtreecommitdiffstats
path: root/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'common.py')
-rw-r--r--common.py359
1 files changed, 208 insertions, 151 deletions
diff --git a/common.py b/common.py
index 78fa4fd..74cd748 100644
--- a/common.py
+++ b/common.py
@@ -1,6 +1,6 @@
#----------------------------------------------------------------------
# Backend utilities for the Klimatanalys Norr project (common module)
-# Copyright © 2024 Guilhem Moulin <info@guilhem.se>
+# Copyright © 2024-2025 Guilhem Moulin <info@guilhem.se>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -16,146 +16,131 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#----------------------------------------------------------------------
-import os, sys
-from os import path
+# pylint: disable=missing-module-docstring
+
+import os
+from os import path as os_path, curdir as os_curdir, pardir as os_pardir, sep as os_sep
+import sys
from fnmatch import fnmatchcase
from pathlib import Path, PosixPath
-from urllib.parse import urlparse, urlunparse
from stat import S_ISDIR
-from math import modf
-from xdg.BaseDirectory import xdg_config_home
+import math
import logging
+from typing import Any, Iterator, Optional, Never, TextIO
+from hashlib import sha256
+
+from xdg.BaseDirectory import xdg_config_home
import yaml
-def init_logger(app=__file__, level=logging.WARNING):
+def init_logger(app : str =__file__, level : int = logging.WARNING) -> logging.Logger:
+ """Initialize the logger"""
+
log_fmt = logging.Formatter('%(levelname)s: %(message)s')
log = logging.getLogger()
log.setLevel(level)
- if os.getenv('SYSTEMD_EXEC_PID', None) is None or os.getenv('JOURNAL_STREAM', None) is None:
+ if (os.getenv('SYSTEMD_EXEC_PID', None) is None
+ or os.getenv('INVOCATION_ID', None) is None
+ or os.getenv('JOURNAL_STREAM', None) is None):
ch = logging.StreamHandler()
else:
# started in systemd, use journald for filtering incl. coloring
- from systemd.journal import JournalHandler
+ from systemd.journal import JournalHandler # pylint: disable=import-outside-toplevel
ch = JournalHandler(SYSLOG_IDENTIFIER=app)
ch.setFormatter(log_fmt)
log.addHandler(ch)
return log
-def load_config(path=None, groupnames=None):
- if path is None:
- for p in [Path(),
- Path(xdg_config_home).joinpath('webmap'),
- PosixPath('/etc').joinpath('webmap')]:
- p = str(p.joinpath('config.yml'))
- if os.path.exists(p):
- path = p
- break
- if path is None:
- raise Exception('Could not find configuration file')
- with open(path, 'r') as fp:
- config = yaml.safe_load(fp)
- layers = config.get('layers', {})
+class MissingConfiguration(Exception):
+ """Exception raised when no configuration file could be found"""
+ def __init__(self, name : str) -> Never:
+ super().__init__(f'Could not find configuration file {name}')
- # validate sources
- destinations = {}
+class BadConfiguration(Exception):
+ """Exception raised when there is a bad configuration"""
+ def __init__(self, message : str, config_path : Optional[Path] = None) -> Never:
+ if config_path is not None:
+ message = str(config_path) + ': ' + message
+ super().__init__(message)
+
+def open_config(filename : str = 'config.yml', appname : str = 'webmap') -> TextIO:
+ """Open the configuration file"""
+ dirs = [
+ Path(),
+ Path(xdg_config_home).joinpath(appname),
+ PosixPath('/etc').joinpath(appname)
+ ]
+ for d in dirs:
+ p = d.joinpath(filename)
+ try:
+ return p.open(mode='r', encoding='utf-8')
+ except (FileNotFoundError, PermissionError) as e:
+ logging.debug('Ignoring exception %s', str(e))
+ raise MissingConfiguration(filename)
+
+def load_config(path : Optional[Path] = None) -> dict[str, Any]:
+ """Load configuration file"""
+ fp = open_config() if path is None else path.open(mode='r', encoding='utf-8')
+ try:
+ return yaml.safe_load(fp)
+ finally:
+ fp.close()
+
+def layers_in_group(groupname : str, patterns : str|list[str],
+ layernames : set[str]) -> Iterator[str]:
+ """Get layer names matching the given patterns"""
+ if isinstance(patterns, str):
+ patterns = [patterns]
+ for pat in patterns:
+ has_match = False
+ for layername in layernames:
+ if fnmatchcase(layername, pat):
+ yield layername
+ has_match = True
+ if has_match:
+ continue
+ if pat in layernames:
+ # fallback to exact match
+ yield pat
+ else:
+ logging.warning('Pattern "%s" in group "%s" does not match anything', pat, groupname)
+
+def parse_config(path : Optional[Path] = None,
+ groupnames : Optional[list[str]] = None) -> dict[str, Any]:
+ """Parse configuration file"""
+ config = load_config(path)
+
+ layers = config.get('layers', {})
for name, layerdefs in layers.items():
if isinstance(layerdefs, dict) and 'sources' not in layerdefs:
layers[name] = { 'sources': [layerdefs] }
- for k in ['description', 'create']:
+ for k in ('description', 'create', 'publish'):
if k in layerdefs:
layers[name][k] = layerdefs.pop(k)
layerdefs = layers[name]
- if 'sources' not in layerdefs:
- raise Exception(f'Layer "{name}" does not have any source receipe')
-
- for sourcedef in layerdefs.get('sources', []):
- source = sourcedef.get('source', None)
- if source is None:
- continue
- download = source.get('download', None)
- if download is None:
- url = None
- dl_module = None
- elif isinstance(download, str):
- url = download
- dl_module = None
- source['download'] = download = { 'url': url }
- else:
- url = download.get('url', None)
- dl_module = download.get('module', None)
- if url is None:
- urlp = None
- else:
- urlp = urlparse(url)
- if urlp is None:
- raise Exception(f'urlparse({url}) failed')
-
- cache = source.get('cache', None)
- if cache is None or isinstance(cache, str):
- source['cache'] = { 'path': cache }
- else:
- cache = cache.get('path', None)
-
- if cache is None or cache in ['', os.curdir, os.pardir] or cache.endswith(os.sep):
- # infer filename from the source URL
- if urlp is None or urlp.path is None or urlp.path == '' or urlp.path.endswith('/'):
- raise Exception(f'Layer "{name}": Could not infer filename from URL {url}')
- p = PosixPath(urlp.path)
- if p is None or p.name is None or p.name == '':
- raise Exception(f'Invalid PosixPath({urlp.path})')
- if cache is None or cache == '':
- cache = Path()
- else:
- cache = Path(cache)
- cache = cache.joinpath(p.name)
- else:
- cache = Path(cache)
- source['cache']['path'] = cache
-
- v = { 'url': urlp, 'module': dl_module }
- if cache in destinations and destinations[cache] != v:
- # allow destination conflicts, but only when the source URL and module match
- raise Exception(f'Destination conflict for layer "{name}"')
- destinations[cache] = v
-
# filter layers that are not of interest
if groupnames is not None:
- layernames = []
+ layernames = set()
+ layernames_all = set(layers.keys())
layer_groups = config.get('layer-groups', {})
for groupname in groupnames:
- if groupname not in layer_groups:
- if groupname in layers:
- # fallback to layer names
- layernames.append(groupname)
+ if groupname in layer_groups:
+ for name in layers_in_group(groupname, layer_groups[groupname], layernames_all):
+ if name in layernames:
+ logging.debug('Layer "%s" was already added, skipping', name)
+ else:
+ layernames.add(name)
+ elif groupname in layers:
+ # fallback to layer names
+ if groupname in layernames:
+ logging.debug('Layer "%s" was already added, skipping', groupname)
else:
- logging.error('Unknown group/layer name "%s"', groupname)
- exit(1)
+ layernames.add(groupname)
else:
- patterns = layer_groups[groupname]
- if isinstance(patterns, str):
- patterns = [patterns]
- for pat in patterns:
- has_match = False
- for layername in layers:
- if fnmatchcase(layername, pat):
- if layername in layernames:
- logging.debug('Layer "%s" was already added, skipping', layername)
- else:
- layernames.append(layername)
- has_match = True
- if has_match:
- pass
- elif pat in layers:
- # fallback to exact match
- if pat in layernames:
- logging.debug('Layer "%s" was already added, skipping', pat)
- else:
- layernames.append(pat)
- else:
- logging.warning('Group name "%s" does not match anything', groupname)
+ logging.error('Unknown group/layer name "%s"', groupname)
+ sys.exit(1)
layers = { name: layers[name] for name in layernames }
@@ -166,40 +151,113 @@ def load_config(path=None, groupnames=None):
if isinstance(extent, list):
config['extent'] = tuple(extent)
if config.get('SRS', None) is None:
- raise Exception('Configured extent without SRS')
+ raise BadConfiguration('Configured extent without SRS')
+
+ return config
+
+def _check_key_type(k : str, v : str, known_keys : list[type, tuple[set[str]]]) -> bool:
+ for t, ks in known_keys:
+ if k in ks and isinstance(v, t):
+ return True
+ return False
+
+def parse_config_dl(downloads) -> dict[str, dict[str, str|int]]:
+ """Parse and validate the "downloads" section from the configuration dictionary"""
- sys.modules[__name__].config = config
+ if not isinstance(downloads, list):
+ raise BadConfiguration(f'Invalid download recipe: {downloads}')
+
+ known_keys = [
+ (str, {'path', 'url'}),
+ (int, {'max-age', 'max-size'}),
+ ]
+
+ destinations = {}
+ known_keys_set = {k for _,ks in known_keys for k in ks}
+ for dl in downloads:
+ if 'url' in dl:
+ dls = [dl]
+ elif 'basedir' in dl and 'baseurl' in dl and 'files' in dl and 'path' not in dl:
+ dls = []
+ for filename in dl['files']:
+ dl2 = {
+ 'path' : os_path.join(dl['basedir'], filename),
+ 'url' : dl['baseurl'] + filename
+ }
+ for k, v in dl.items():
+ if k not in ('basedir', 'baseurl', 'files'):
+ dl2[k] = v
+ dls.append(dl2)
+ else:
+ raise BadConfiguration(f'Invalid download recipe: {dl}')
+
+ for dl in dls:
+ path = dl.get('path', None)
+ if path is None or path in ('', os_curdir, os_pardir) or path.endswith(os_sep):
+ raise BadConfiguration(f'Invalid destination path "{path}"')
+ if path in destinations:
+ raise BadConfiguration(f'Duplicate download recipe for "{path}"')
+ dl2 = {}
+ for k, v in dl.items():
+ if k == 'path':
+ continue
+ if k not in known_keys_set:
+ logging.warning('Ignoring unknown setting "%s" in download recipe for "%s"',
+ k, path)
+ elif not _check_key_type(k, v, known_keys):
+ logging.warning('Ignoring setting "%s" in download recipe for "%s"'
+ ' (invalid type)', k, path)
+ else:
+ dl2[k] = v
+ destinations[path] = dl2
-def format_bytes(n):
- if n < 768:
+ return destinations
+
+# pylint: disable-next=invalid-name
+def getSourcePathLockFileName(path : str) -> str:
+ """Return the name of the lockfile associated with a source path."""
+ return sha256(path.encode('utf-8')).hexdigest() + '.lck'
+
+def format_bytes(n : int, threshold : int = 768, precision : int = 2) -> str:
+ """Format a number of bytes to a SI unit"""
+
+ if n < threshold:
return f'{n}\u202FB'
- elif n < 768*1024:
- return f'{n/1024:.2f}\u202FkiB'
- elif n < 768*1024*1024:
- return f'{n/1048576:.2f}\u202FMiB'
- else:
- return f'{n/1073741824:.2f}\u202FGiB'
+ if n < threshold * 1024:
+ return f'{n/1024:.{precision}f}\u202FkiB'
+ if n < threshold * 1048576:
+ return f'{n/1048576:.{precision}f}\u202FMiB'
+ return f'{n/1073741824:.{precision}f}\u202FGiB'
+
+def format_time(ts : float, precision : int = 3) -> str:
+ """Format a timestamp to HH:MM:SS.fff"""
-def format_time(s):
- fs, s = modf(s)
- m, s = divmod(int(s), 60)
+ w = 2 if precision == 0 else precision + 3
+ ts = round(ts, precision)
+ m = math.floor(ts/60.)
+ s = ts - 60. * m
h, m = divmod(m, 60)
- return f'{h:02d}:{m:02d}:{s + fs:06.3f}'
+ return f'{h:02d}:{m:02d}:{s:0{w}.{precision}f}'
-# Return a boolean indicating whether the installer GDAL version is
-# greater than or equal to the provider (maj, min, rev) triplet.
-def gdal_version_min(maj=0, min=0, rev=0):
- if maj < 1 or (maj == 1 and min < 10):
- # GDAL_VERSION_NUM() macro was changed in 1.10. That version
- # was released in 2013 so we blindly assume the installer
- # version is more recent
- return True
+def escape_identifier(identifier : str) -> str:
+ """Escape the given identifier, cf.
+ swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id()."""
- from osgeo import gdal
- version_cur = int(gdal.VersionInfo());
- # cf. GDAL_COMPUTE_VERSION(maj,min,rev) in gcore/gdal_version.h.in
- version_min = maj*1000000 + min*10000 + rev*100
- return version_min <= version_cur
+ if identifier is None or '\x00' in identifier:
+ raise RuntimeError(f'Invalid identifier "{identifier}"')
+
+ # SQL:1999 delimited identifier
+ return '"' + identifier.replace('"', '""') + '"'
+
+def escape_literal_str(literal : str) -> str:
+ """Escape the given character string literal, cf.
+ swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_literal()."""
+
+ if literal is None or '\x00' in literal:
+ raise RuntimeError(f'Invalid literal "{literal}"')
+
+ # SQL:1999 character string literal
+ return '\'' + literal.replace('\'', '\'\'') + '\''
######
@@ -207,43 +265,42 @@ def gdal_version_min(maj=0, min=0, rev=0):
# and augmented with dir_fd.
# An alternative would be to use str(Path(f'/proc/self/fd/{dir_fd}').joinpath(path)).
-# Is a path a directory?
-# (From genericpath.py.)
-def isdir(path, dir_fd=None, follow_symlinks=True):
+def isdir(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool:
+ """Is a path a directory? (From genericpath.py.)"""
try:
st = os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks)
except (OSError, ValueError):
return False
return S_ISDIR(st.st_mode)
-# Does a path exist?
-# (From genericpath.py.)
-def exists(path, dir_fd=None, follow_symlinks=True):
+def exists(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool:
+ """Does a path exist? (From genericpath.py.)"""
try:
os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks)
except (OSError, ValueError):
return False
return True
-# Create a leaf directory and all intermediate ones.
-# (From os.py.)
-def makedirs(name, mode=0o777, exist_ok=False, dir_fd=None, logging=None):
- head, tail = path.split(name)
+def makedirs(name : str, mode : int = 0o777,
+ exist_ok : bool = False,
+ dir_fd : Optional[int] = None) -> None:
+ """Create a leaf directory and all intermediate ones. (From os.py.)"""
+
+ head, tail = os_path.split(name)
if not tail:
- head, tail = path.split(head)
+ head, tail = os_path.split(head)
if head and tail and not exists(head, dir_fd=dir_fd):
try:
- makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd, logging=logging)
+ makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd)
except FileExistsError:
# Defeats race condition when another thread created the path
pass
- cdir = os.curdir
+ cdir = os_curdir
if isinstance(tail, bytes):
- cdir = bytes(os.curdir, 'ASCII')
+ cdir = bytes(os_curdir, 'ASCII')
if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists
return
- if logging is not None:
- logging.debug('mkdir("%s", 0%o)', name, mode)
+ logging.debug('mkdir("%s", 0%o)', name, mode)
try:
os.mkdir(name, mode, dir_fd=dir_fd)
except OSError: