aboutsummaryrefslogtreecommitdiffstats
path: root/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'common.py')
-rw-r--r--common.py83
1 files changed, 45 insertions, 38 deletions
diff --git a/common.py b/common.py
index b1d14ba..0035160 100644
--- a/common.py
+++ b/common.py
@@ -26,7 +26,7 @@ from pathlib import Path, PosixPath
from stat import S_ISDIR
import math
import logging
-from typing import Any, Optional, Never
+from typing import Any, Iterator, Optional, Never
from hashlib import sha256
from xdg.BaseDirectory import xdg_config_home
@@ -39,8 +39,8 @@ def init_logger(app : str =__file__, level : int = logging.WARNING) -> logging.L
log = logging.getLogger()
log.setLevel(level)
- pid = os.getenv('SYSTEMD_EXEC_PID', None)
- if (pid is None or int(pid) != os.getpid()
+ if (os.getenv('SYSTEMD_EXEC_PID', None) is None
+ or os.getenv('INVOCATION_ID', None) is None
or os.getenv('JOURNAL_STREAM', None) is None):
ch = logging.StreamHandler()
else:
@@ -77,13 +77,35 @@ def find_config(filename : str = 'config.yml', appname : str = 'webmap') -> Path
return p
raise MissingConfiguration(filename)
+def load_config(path : Optional[Path] = None) -> dict[str, Any]:
+ """Load configuration file"""
+ config_path = find_config() if path is None else path
+ with config_path.open(mode='r', encoding='utf-8') as fp:
+ return yaml.safe_load(fp)
+
+def layers_in_group(groupname : str, patterns : str|list[str],
+ layernames : set[str]) -> Iterator[str]:
+ """Get layer names matching the given patterns"""
+ if isinstance(patterns, str):
+ patterns = [patterns]
+ for pat in patterns:
+ has_match = False
+ for layername in layernames:
+ if fnmatchcase(layername, pat):
+ yield layername
+ has_match = True
+ if has_match:
+ continue
+ if pat in layernames:
+ # fallback to exact match
+ yield pat
+ else:
+ logging.warning('Pattern "%s" in group "%s" does not match anything', pat, groupname)
+
def parse_config(path : Optional[Path] = None,
groupnames : Optional[list[str]] = None) -> dict[str, Any]:
"""Parse configuration file"""
-
- config_path = find_config() if path is None else path
- with config_path.open(mode='r', encoding='utf-8') as fp:
- config = yaml.safe_load(fp)
+ config = load_config(path)
layers = config.get('layers', {})
for name, layerdefs in layers.items():
@@ -96,39 +118,25 @@ def parse_config(path : Optional[Path] = None,
# filter layers that are not of interest
if groupnames is not None:
- layernames = []
+ layernames = set()
+ layernames_all = set(layers.keys())
layer_groups = config.get('layer-groups', {})
for groupname in groupnames:
- if groupname not in layer_groups:
- if groupname in layers:
- # fallback to layer names
- layernames.append(groupname)
+ if groupname in layer_groups:
+ for name in layers_in_group(groupname, layer_groups[groupname], layernames_all):
+ if name in layernames:
+ logging.debug('Layer "%s" was already added, skipping', name)
+ else:
+ layernames.add(name)
+ elif groupname in layers:
+ # fallback to layer names
+ if groupname in layernames:
+ logging.debug('Layer "%s" was already added, skipping', groupname)
else:
- logging.error('Unknown group/layer name "%s"', groupname)
- sys.exit(1)
+ layernames.add(groupname)
else:
- patterns = layer_groups[groupname]
- if isinstance(patterns, str):
- patterns = [patterns]
- for pat in patterns:
- has_match = False
- for layername in layers:
- if fnmatchcase(layername, pat):
- if layername in layernames:
- logging.debug('Layer "%s" was already added, skipping', layername)
- else:
- layernames.append(layername)
- has_match = True
- if has_match:
- pass
- elif pat in layers:
- # fallback to exact match
- if pat in layernames:
- logging.debug('Layer "%s" was already added, skipping', pat)
- else:
- layernames.append(pat)
- else:
- logging.warning('Group name "%s" does not match anything', groupname)
+ logging.error('Unknown group/layer name "%s"', groupname)
+ sys.exit(1)
layers = { name: layers[name] for name in layernames }
@@ -139,8 +147,7 @@ def parse_config(path : Optional[Path] = None,
if isinstance(extent, list):
config['extent'] = tuple(extent)
if config.get('SRS', None) is None:
- raise BadConfiguration('Configured extent without SRS',
- config_path=config_path)
+ raise BadConfiguration('Configured extent without SRS')
return config