import os, sys from os import path from fnmatch import fnmatchcase from pathlib import Path, PosixPath from urllib.parse import urlparse, urlunparse from stat import S_ISDIR from xdg.BaseDirectory import xdg_config_home import logging import yaml import __main__ as main def load_config(path=None, groupnames=None): main_script = os.path.basename(main.__file__) if path is None: for p in [Path(), Path(xdg_config_home).joinpath('webmap'), PosixPath('/etc').joinpath('webmap')]: p = p.joinpath('config.yml') if p.exists(): path = str(p) break with open(path, 'r') as fp: config = yaml.safe_load(fp) layers = config.get('layers', {}) # validate sources destinations = {} for name, layer in layers.items(): if isinstance(layer, dict): layers[name] = layer = [layer] for sourcedef in layer: source = sourcedef.get('source', None) if source is None: continue download = source.get('download', None) if download is None: url = None script = None elif isinstance(download, str): url = download script = None source['download'] = download = { 'url': url } else: url = download.get('url', None) script = download.get('script', None) if url is None: urlp = None else: urlp = urlparse(url) if urlp is None: raise Exception(f'urlparse({url}) failed') cache = source.get('cache', None) if cache is None or isinstance(cache, str): source['cache'] = { 'path': cache } else: cache = cache.get('path', None) if cache is None or cache in ['', os.curdir, os.pardir] or cache.endswith(os.sep): # infer filename from the source URL if urlp is None or urlp.path is None or urlp.path == '' or urlp.path.endswith('/'): raise Exception(f'Layer "{name}": Could not infer filename from URL {url}') p = PosixPath(urlp.path) if p is None or p.name is None or p.name == '': raise Exception(f'Invalid PosixPath({urlp.path})') if cache is None or cache == '': cache = Path() else: cache = Path(cache) cache = cache.joinpath(p.name) else: cache = Path(cache) source['cache']['path'] = cache v = { 'url': urlp, 'script': main_script if script is None else script } if cache in destinations and destinations[cache] != v: # allow destination conflicts, but only when the source URL and script match raise Exception(f'Destination conflict for layer "{name}"') destinations[cache] = v # filter layers that are not of interest if groupnames is not None: layernames = [] layer_groups = config.get('layer-groups', {}) for groupname in groupnames: if groupname not in layer_groups: if groupname in layers: # fallback to layer names layernames.append(groupname) else: logging.error('Unknown group/layer name "%s"', groupname) exit(1) else: patterns = layer_groups[groupname] if isinstance(patterns, str): patterns = [patterns] for pat in patterns: has_match = False for layername in layers: if fnmatchcase(layername, pat): if layername in layernames: logging.debug('Layer "%s" was already added, skipping', layername) else: layernames.append(layername) has_match = True if has_match: pass elif pat in layers: # fallback to exact match if pat in layernames: logging.debug('Layer "%s" was already added, skipping', pat) else: layernames.append(pat) else: logging.warning('Group name "%s" does not match anything', groupname) layers = { name: layers[name] for name in layernames } config['layers'] = layers sys.modules[__name__].config = config ###### # The function definitions below are taken from cpython's source code # and augmented with dir_fd. # Is a path a directory? # (From genericpath.py.) def isdir(path, dir_fd=None, follow_symlinks=True): try: st = os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return S_ISDIR(st.st_mode) # Does a path exist? # (From genericpath.py.) def exists(path, dir_fd=None, follow_symlinks=True): try: os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return True # Create a leaf directory and all intermediate ones. # (From os.py.) def makedirs(name, mode=0o777, exist_ok=False, dir_fd=None, logging=None): head, tail = path.split(name) if not tail: head, tail = path.split(head) if head and tail and not exists(head, dir_fd=dir_fd): try: makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd, logging=logging) except FileExistsError: # Defeats race condition when another thread created the path pass cdir = os.curdir if isinstance(tail, bytes): cdir = bytes(os.curdir, 'ASCII') if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists return if logging is not None: logging.debug('mkdir("%s", 0%o)', name, mode) try: os.mkdir(name, mode, dir_fd=dir_fd) except OSError: # Cannot rely on checking for EEXIST, since the operating system # could give priority to other errors like EACCES or EROFS if not exist_ok or not isdir(name, dir_fd=dir_fd): raise