aboutsummaryrefslogtreecommitdiffstats
path: root/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'common.py')
-rw-r--r--common.py80
1 files changed, 71 insertions, 9 deletions
diff --git a/common.py b/common.py
index 0035160..cbdc43c 100644
--- a/common.py
+++ b/common.py
@@ -19,14 +19,14 @@
# pylint: disable=missing-module-docstring
import os
-from os import path as os_path, curdir as os_curdir
+from os import path as os_path, curdir as os_curdir, pardir as os_pardir, sep as os_sep
import sys
from fnmatch import fnmatchcase
from pathlib import Path, PosixPath
from stat import S_ISDIR
import math
import logging
-from typing import Any, Iterator, Optional, Never
+from typing import Any, Iterator, Optional, Never, TextIO
from hashlib import sha256
from xdg.BaseDirectory import xdg_config_home
@@ -64,8 +64,8 @@ class BadConfiguration(Exception):
message = str(config_path) + ': ' + message
super().__init__(message)
-def find_config(filename : str = 'config.yml', appname : str = 'webmap') -> Path:
- """Return the configuration file path"""
+def open_config(filename : str = 'config.yml', appname : str = 'webmap') -> TextIO:
+ """Open the configuration file"""
dirs = [
Path(),
Path(xdg_config_home).joinpath(appname),
@@ -73,15 +73,19 @@ def find_config(filename : str = 'config.yml', appname : str = 'webmap') -> Path
]
for d in dirs:
p = d.joinpath(filename)
- if p.exists():
- return p
+ try:
+ return p.open(mode='r', encoding='utf-8')
+ except (FileNotFoundError, PermissionError) as e:
+ logging.debug('Ignoring exception %s', str(e))
raise MissingConfiguration(filename)
def load_config(path : Optional[Path] = None) -> dict[str, Any]:
"""Load configuration file"""
- config_path = find_config() if path is None else path
- with config_path.open(mode='r', encoding='utf-8') as fp:
+ fp = open_config() if path is None else path.open(mode='r', encoding='utf-8')
+ try:
return yaml.safe_load(fp)
+ finally:
+ fp.close()
def layers_in_group(groupname : str, patterns : str|list[str],
layernames : set[str]) -> Iterator[str]:
@@ -111,7 +115,7 @@ def parse_config(path : Optional[Path] = None,
for name, layerdefs in layers.items():
if isinstance(layerdefs, dict) and 'sources' not in layerdefs:
layers[name] = { 'sources': [layerdefs] }
- for k in ('description', 'create', 'publish'):
+ for k in ('description', 'create', 'publish', 'type'):
if k in layerdefs:
layers[name][k] = layerdefs.pop(k)
layerdefs = layers[name]
@@ -151,6 +155,64 @@ def parse_config(path : Optional[Path] = None,
return config
+def _check_key_type(k : str, v : str, known_keys : list[type, tuple[set[str]]]) -> bool:
+ for t, ks in known_keys:
+ if k in ks and isinstance(v, t):
+ return True
+ return False
+
+def parse_config_dl(downloads) -> dict[str, dict[str, str|int]]:
+ """Parse and validate the "downloads" section from the configuration dictionary"""
+
+ if not isinstance(downloads, list):
+ raise BadConfiguration(f'Invalid download recipe: {downloads}')
+
+ known_keys = [
+ (str, {'path', 'url'}),
+ (int, {'max-age', 'max-size'}),
+ ]
+
+ destinations = {}
+ known_keys_set = {k for _,ks in known_keys for k in ks}
+ for dl in downloads:
+ if 'url' in dl:
+ dls = [dl]
+ elif 'basedir' in dl and 'baseurl' in dl and 'files' in dl and 'path' not in dl:
+ dls = []
+ for filename in dl['files']:
+ dl2 = {
+ 'path' : os_path.join(dl['basedir'], filename),
+ 'url' : dl['baseurl'] + filename
+ }
+ for k, v in dl.items():
+ if k not in ('basedir', 'baseurl', 'files'):
+ dl2[k] = v
+ dls.append(dl2)
+ else:
+ raise BadConfiguration(f'Invalid download recipe: {dl}')
+
+ for dl in dls:
+ path = dl.get('path', None)
+ if path is None or path in ('', os_curdir, os_pardir) or path.endswith(os_sep):
+ raise BadConfiguration(f'Invalid destination path "{path}"')
+ if path in destinations:
+ raise BadConfiguration(f'Duplicate download recipe for "{path}"')
+ dl2 = {}
+ for k, v in dl.items():
+ if k == 'path':
+ continue
+ if k not in known_keys_set:
+ logging.warning('Ignoring unknown setting "%s" in download recipe for "%s"',
+ k, path)
+ elif not _check_key_type(k, v, known_keys):
+ logging.warning('Ignoring setting "%s" in download recipe for "%s"'
+ ' (invalid type)', k, path)
+ else:
+ dl2[k] = v
+ destinations[path] = dl2
+
+ return destinations
+
# pylint: disable-next=invalid-name
def getSourcePathLockFileName(path : str) -> str:
"""Return the name of the lockfile associated with a source path."""