diff options
Diffstat (limited to 'common.py')
-rw-r--r-- | common.py | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/common.py b/common.py new file mode 100644 index 0000000..e4456af --- /dev/null +++ b/common.py @@ -0,0 +1,171 @@ +import os, sys +from os import path +from fnmatch import fnmatchcase +from pathlib import Path, PosixPath +from urllib.parse import urlparse, urlunparse +from stat import S_ISDIR +from xdg.BaseDirectory import xdg_config_home +import logging +import yaml +import __main__ as main + +def load_config(path=None, groupnames=None): + main_script = os.path.basename(main.__file__) + if path is None: + for p in [Path(), + Path(xdg_config_home).joinpath('webmap'), + PosixPath('/etc').joinpath('webmap')]: + p = p.joinpath('config.yml') + if p.exists(): + path = str(p) + break + with open(path, 'r') as fp: + config = yaml.safe_load(fp) + layers = config.get('layers', {}) + + # validate sources + destinations = {} + for name, layer in layers.items(): + if isinstance(layer, dict): + layers[name] = layer = [layer] + + for sourcedef in layer: + source = sourcedef.get('source', None) + if source is None: + continue + download = source.get('download', None) + if download is None: + url = None + script = None + elif isinstance(download, str): + url = download + script = None + source['download'] = download = { 'url': url } + else: + url = download.get('url', None) + script = download.get('script', None) + if url is None: + urlp = None + else: + urlp = urlparse(url) + if urlp is None: + raise Exception(f'urlparse({url}) failed') + + cache = source.get('cache', None) + if cache is None or isinstance(cache, str): + source['cache'] = { 'path': cache } + else: + cache = cache.get('path', None) + + if cache is None or cache in ['', os.curdir, os.pardir] or cache.endswith(os.sep): + # infer filename from the source URL + if urlp is None or urlp.path is None or urlp.path == '' or urlp.path.endswith('/'): + raise Exception(f'Layer "{name}": Could not infer filename from URL {url}') + p = PosixPath(urlp.path) + if p is None or p.name is None or p.name == '': + raise Exception(f'Invalid PosixPath({urlp.path})') + if cache is None or cache == '': + cache = Path() + else: + cache = Path(cache) + cache = cache.joinpath(p.name) + else: + cache = Path(cache) + source['cache']['path'] = cache + + v = { 'url': urlp, 'script': main_script if script is None else script } + if cache in destinations and destinations[cache] != v: + # allow destination conflicts, but only when the source URL and script match + raise Exception(f'Destination conflict for layer "{name}"') + destinations[cache] = v + + # filter layers that are not of interest + if groupnames is not None: + layernames = [] + layer_groups = config.get('layer-groups', {}) + for groupname in groupnames: + if groupname not in layer_groups: + if groupname in layers: + # fallback to layer names + layernames.append(groupname) + else: + logging.error('Unknown group/layer name "%s"', groupname) + exit(1) + else: + patterns = layer_groups[groupname] + if isinstance(patterns, str): + patterns = [patterns] + for pat in patterns: + has_match = False + for layername in layers: + if fnmatchcase(layername, pat): + if layername in layernames: + logging.debug('Layer "%s" was already added, skipping', layername) + else: + layernames.append(layername) + has_match = True + if has_match: + pass + elif pat in layers: + # fallback to exact match + if pat in layernames: + logging.debug('Layer "%s" was already added, skipping', pat) + else: + layernames.append(pat) + else: + logging.warning('Group name "%s" does not match anything', groupname) + + layers = { name: layers[name] for name in layernames } + + config['layers'] = layers + sys.modules[__name__].config = config + + +###### +# The function definitions below are taken from cpython's source code +# and augmented with dir_fd. + +# Is a path a directory? +# (From genericpath.py.) +def isdir(path, dir_fd=None, follow_symlinks=True): + try: + st = os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) + except (OSError, ValueError): + return False + return S_ISDIR(st.st_mode) + +# Does a path exist? +# (From genericpath.py.) +def exists(path, dir_fd=None, follow_symlinks=True): + try: + os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) + except (OSError, ValueError): + return False + return True + +# Create a leaf directory and all intermediate ones. +# (From os.py.) +def makedirs(name, mode=0o777, exist_ok=False, dir_fd=None, logging=None): + head, tail = path.split(name) + if not tail: + head, tail = path.split(head) + if head and tail and not exists(head, dir_fd=dir_fd): + try: + makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd, logging=logging) + except FileExistsError: + # Defeats race condition when another thread created the path + pass + cdir = os.curdir + if isinstance(tail, bytes): + cdir = bytes(os.curdir, 'ASCII') + if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists + return + if logging is not None: + logging.debug('mkdir("%s", 0%o)', name, mode) + try: + os.mkdir(name, mode, dir_fd=dir_fd) + except OSError: + # Cannot rely on checking for EEXIST, since the operating system + # could give priority to other errors like EACCES or EROFS + if not exist_ok or not isdir(name, dir_fd=dir_fd): + raise |