aboutsummaryrefslogtreecommitdiffstats
path: root/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'common.py')
-rw-r--r--common.py171
1 files changed, 171 insertions, 0 deletions
diff --git a/common.py b/common.py
new file mode 100644
index 0000000..e4456af
--- /dev/null
+++ b/common.py
@@ -0,0 +1,171 @@
+import os, sys
+from os import path
+from fnmatch import fnmatchcase
+from pathlib import Path, PosixPath
+from urllib.parse import urlparse, urlunparse
+from stat import S_ISDIR
+from xdg.BaseDirectory import xdg_config_home
+import logging
+import yaml
+import __main__ as main
+
+def load_config(path=None, groupnames=None):
+ main_script = os.path.basename(main.__file__)
+ if path is None:
+ for p in [Path(),
+ Path(xdg_config_home).joinpath('webmap'),
+ PosixPath('/etc').joinpath('webmap')]:
+ p = p.joinpath('config.yml')
+ if p.exists():
+ path = str(p)
+ break
+ with open(path, 'r') as fp:
+ config = yaml.safe_load(fp)
+ layers = config.get('layers', {})
+
+ # validate sources
+ destinations = {}
+ for name, layer in layers.items():
+ if isinstance(layer, dict):
+ layers[name] = layer = [layer]
+
+ for sourcedef in layer:
+ source = sourcedef.get('source', None)
+ if source is None:
+ continue
+ download = source.get('download', None)
+ if download is None:
+ url = None
+ script = None
+ elif isinstance(download, str):
+ url = download
+ script = None
+ source['download'] = download = { 'url': url }
+ else:
+ url = download.get('url', None)
+ script = download.get('script', None)
+ if url is None:
+ urlp = None
+ else:
+ urlp = urlparse(url)
+ if urlp is None:
+ raise Exception(f'urlparse({url}) failed')
+
+ cache = source.get('cache', None)
+ if cache is None or isinstance(cache, str):
+ source['cache'] = { 'path': cache }
+ else:
+ cache = cache.get('path', None)
+
+ if cache is None or cache in ['', os.curdir, os.pardir] or cache.endswith(os.sep):
+ # infer filename from the source URL
+ if urlp is None or urlp.path is None or urlp.path == '' or urlp.path.endswith('/'):
+ raise Exception(f'Layer "{name}": Could not infer filename from URL {url}')
+ p = PosixPath(urlp.path)
+ if p is None or p.name is None or p.name == '':
+ raise Exception(f'Invalid PosixPath({urlp.path})')
+ if cache is None or cache == '':
+ cache = Path()
+ else:
+ cache = Path(cache)
+ cache = cache.joinpath(p.name)
+ else:
+ cache = Path(cache)
+ source['cache']['path'] = cache
+
+ v = { 'url': urlp, 'script': main_script if script is None else script }
+ if cache in destinations and destinations[cache] != v:
+ # allow destination conflicts, but only when the source URL and script match
+ raise Exception(f'Destination conflict for layer "{name}"')
+ destinations[cache] = v
+
+ # filter layers that are not of interest
+ if groupnames is not None:
+ layernames = []
+ layer_groups = config.get('layer-groups', {})
+ for groupname in groupnames:
+ if groupname not in layer_groups:
+ if groupname in layers:
+ # fallback to layer names
+ layernames.append(groupname)
+ else:
+ logging.error('Unknown group/layer name "%s"', groupname)
+ exit(1)
+ else:
+ patterns = layer_groups[groupname]
+ if isinstance(patterns, str):
+ patterns = [patterns]
+ for pat in patterns:
+ has_match = False
+ for layername in layers:
+ if fnmatchcase(layername, pat):
+ if layername in layernames:
+ logging.debug('Layer "%s" was already added, skipping', layername)
+ else:
+ layernames.append(layername)
+ has_match = True
+ if has_match:
+ pass
+ elif pat in layers:
+ # fallback to exact match
+ if pat in layernames:
+ logging.debug('Layer "%s" was already added, skipping', pat)
+ else:
+ layernames.append(pat)
+ else:
+ logging.warning('Group name "%s" does not match anything', groupname)
+
+ layers = { name: layers[name] for name in layernames }
+
+ config['layers'] = layers
+ sys.modules[__name__].config = config
+
+
+######
+# The function definitions below are taken from cpython's source code
+# and augmented with dir_fd.
+
+# Is a path a directory?
+# (From genericpath.py.)
+def isdir(path, dir_fd=None, follow_symlinks=True):
+ try:
+ st = os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks)
+ except (OSError, ValueError):
+ return False
+ return S_ISDIR(st.st_mode)
+
+# Does a path exist?
+# (From genericpath.py.)
+def exists(path, dir_fd=None, follow_symlinks=True):
+ try:
+ os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks)
+ except (OSError, ValueError):
+ return False
+ return True
+
+# Create a leaf directory and all intermediate ones.
+# (From os.py.)
+def makedirs(name, mode=0o777, exist_ok=False, dir_fd=None, logging=None):
+ head, tail = path.split(name)
+ if not tail:
+ head, tail = path.split(head)
+ if head and tail and not exists(head, dir_fd=dir_fd):
+ try:
+ makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd, logging=logging)
+ except FileExistsError:
+ # Defeats race condition when another thread created the path
+ pass
+ cdir = os.curdir
+ if isinstance(tail, bytes):
+ cdir = bytes(os.curdir, 'ASCII')
+ if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists
+ return
+ if logging is not None:
+ logging.debug('mkdir("%s", 0%o)', name, mode)
+ try:
+ os.mkdir(name, mode, dir_fd=dir_fd)
+ except OSError:
+ # Cannot rely on checking for EEXIST, since the operating system
+ # could give priority to other errors like EACCES or EROFS
+ if not exist_ok or not isdir(name, dir_fd=dir_fd):
+ raise