diff options
Diffstat (limited to 'common.py')
| -rw-r--r-- | common.py | 80 |
1 files changed, 71 insertions, 9 deletions
@@ -19,14 +19,14 @@ # pylint: disable=missing-module-docstring import os -from os import path as os_path, curdir as os_curdir +from os import path as os_path, curdir as os_curdir, pardir as os_pardir, sep as os_sep import sys from fnmatch import fnmatchcase from pathlib import Path, PosixPath from stat import S_ISDIR import math import logging -from typing import Any, Iterator, Optional, Never +from typing import Any, Iterator, Optional, Never, TextIO from hashlib import sha256 from xdg.BaseDirectory import xdg_config_home @@ -64,8 +64,8 @@ class BadConfiguration(Exception): message = str(config_path) + ': ' + message super().__init__(message) -def find_config(filename : str = 'config.yml', appname : str = 'webmap') -> Path: - """Return the configuration file path""" +def open_config(filename : str = 'config.yml', appname : str = 'webmap') -> TextIO: + """Open the configuration file""" dirs = [ Path(), Path(xdg_config_home).joinpath(appname), @@ -73,15 +73,19 @@ def find_config(filename : str = 'config.yml', appname : str = 'webmap') -> Path ] for d in dirs: p = d.joinpath(filename) - if p.exists(): - return p + try: + return p.open(mode='r', encoding='utf-8') + except (FileNotFoundError, PermissionError) as e: + logging.debug('Ignoring exception %s', str(e)) raise MissingConfiguration(filename) def load_config(path : Optional[Path] = None) -> dict[str, Any]: """Load configuration file""" - config_path = find_config() if path is None else path - with config_path.open(mode='r', encoding='utf-8') as fp: + fp = open_config() if path is None else path.open(mode='r', encoding='utf-8') + try: return yaml.safe_load(fp) + finally: + fp.close() def layers_in_group(groupname : str, patterns : str|list[str], layernames : set[str]) -> Iterator[str]: @@ -111,7 +115,7 @@ def parse_config(path : Optional[Path] = None, for name, layerdefs in layers.items(): if isinstance(layerdefs, dict) and 'sources' not in layerdefs: layers[name] = { 'sources': [layerdefs] } - for k in ('description', 'create', 'publish'): + for k in ('description', 'create', 'publish', 'type'): if k in layerdefs: layers[name][k] = layerdefs.pop(k) layerdefs = layers[name] @@ -151,6 +155,64 @@ def parse_config(path : Optional[Path] = None, return config +def _check_key_type(k : str, v : str, known_keys : list[type, tuple[set[str]]]) -> bool: + for t, ks in known_keys: + if k in ks and isinstance(v, t): + return True + return False + +def parse_config_dl(downloads) -> dict[str, dict[str, str|int]]: + """Parse and validate the "downloads" section from the configuration dictionary""" + + if not isinstance(downloads, list): + raise BadConfiguration(f'Invalid download recipe: {downloads}') + + known_keys = [ + (str, {'path', 'url'}), + (int, {'max-age', 'max-size'}), + ] + + destinations = {} + known_keys_set = {k for _,ks in known_keys for k in ks} + for dl in downloads: + if 'url' in dl: + dls = [dl] + elif 'basedir' in dl and 'baseurl' in dl and 'files' in dl and 'path' not in dl: + dls = [] + for filename in dl['files']: + dl2 = { + 'path' : os_path.join(dl['basedir'], filename), + 'url' : dl['baseurl'] + filename + } + for k, v in dl.items(): + if k not in ('basedir', 'baseurl', 'files'): + dl2[k] = v + dls.append(dl2) + else: + raise BadConfiguration(f'Invalid download recipe: {dl}') + + for dl in dls: + path = dl.get('path', None) + if path is None or path in ('', os_curdir, os_pardir) or path.endswith(os_sep): + raise BadConfiguration(f'Invalid destination path "{path}"') + if path in destinations: + raise BadConfiguration(f'Duplicate download recipe for "{path}"') + dl2 = {} + for k, v in dl.items(): + if k == 'path': + continue + if k not in known_keys_set: + logging.warning('Ignoring unknown setting "%s" in download recipe for "%s"', + k, path) + elif not _check_key_type(k, v, known_keys): + logging.warning('Ignoring setting "%s" in download recipe for "%s"' + ' (invalid type)', k, path) + else: + dl2[k] = v + destinations[path] = dl2 + + return destinations + # pylint: disable-next=invalid-name def getSourcePathLockFileName(path : str) -> str: """Return the name of the lockfile associated with a source path.""" |
