aboutsummaryrefslogtreecommitdiffstats
path: root/common.py
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-04-17 12:23:38 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-04-19 05:14:48 +0200
commit2abf2297aabb355b72c6ae9e0aaf350f7a6cbe9d (patch)
treedd1a157dc6e08a96fdb99d79a6cf2e43047f3650 /common.py
parent4bcf4d8a3229c89813cbf3c05f4ef14cc80202d9 (diff)
Add type hints and refactor a bit to please pylint.
Diffstat (limited to 'common.py')
-rw-r--r--common.py210
1 files changed, 127 insertions, 83 deletions
diff --git a/common.py b/common.py
index f0ddaa4..da2927f 100644
--- a/common.py
+++ b/common.py
@@ -16,18 +16,25 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#----------------------------------------------------------------------
-import os, sys
-from os import path
+# pylint: disable=missing-module-docstring
+
+import os
+from os import path as os_path, curdir as os_curdir, pardir as os_pardir, sep as os_sep
+import sys
from fnmatch import fnmatchcase
from pathlib import Path, PosixPath
-from urllib.parse import urlparse, urlunparse
+from urllib.parse import urlparse
from stat import S_ISDIR
-from math import floor
-from xdg.BaseDirectory import xdg_config_home
+import math
import logging
+from typing import Any, Optional, Never
+
+from xdg.BaseDirectory import xdg_config_home
import yaml
-def init_logger(app=__file__, level=logging.WARNING):
+def init_logger(app : str =__file__, level : int = logging.WARNING) -> logging.Logger:
+ """Initialize the logger"""
+
log_fmt = logging.Formatter('%(levelname)s: %(message)s')
log = logging.getLogger()
log.setLevel(level)
@@ -38,25 +45,42 @@ def init_logger(app=__file__, level=logging.WARNING):
ch = logging.StreamHandler()
else:
# started in systemd, use journald for filtering incl. coloring
- from systemd.journal import JournalHandler
+ from systemd.journal import JournalHandler # pylint: disable=import-outside-toplevel
ch = JournalHandler(SYSLOG_IDENTIFIER=app)
ch.setFormatter(log_fmt)
log.addHandler(ch)
return log
-def load_config(path=None, groupnames=None):
- if path is None:
- for p in [Path(),
- Path(xdg_config_home).joinpath('webmap'),
- PosixPath('/etc').joinpath('webmap')]:
- p = str(p.joinpath('config.yml'))
- if os.path.exists(p):
- path = p
- break
- if path is None:
- raise Exception('Could not find configuration file')
- with open(path, 'r') as fp:
+class MissingConfiguration(Exception):
+ """Exception raised when no configuration file could be found"""
+ def __init__(self, name : str) -> Never:
+ super().__init__(f'Could not find configuration file {name}')
+
+def find_config(filename : str = 'config.yml', appname : str = 'webmap') -> Path:
+ """Return the configuration file path"""
+ dirs = [
+ Path(),
+ Path(xdg_config_home).joinpath(appname),
+ PosixPath('/etc').joinpath(appname)
+ ]
+ for d in dirs:
+ p = d.joinpath(filename)
+ if p.exists():
+ return p
+ raise MissingConfiguration(filename)
+
+class BadConfiguration(Exception):
+ """Exception raised when there is a bad configuration"""
+ def __init__(self, config_path : Path, message : str) -> Never:
+ super().__init__(str(config_path) + ': ' + message)
+
+def parse_config(path : Optional[Path] = None,
+ groupnames : Optional[list[str]] = None) -> dict[str, Any]:
+ """Parse configuration file"""
+
+ config_path = find_config() if path is None else path
+ with config_path.open(mode='r', encoding='utf-8') as fp:
config = yaml.safe_load(fp)
layers = config.get('layers', {})
@@ -71,7 +95,8 @@ def load_config(path=None, groupnames=None):
layerdefs = layers[name]
if 'sources' not in layerdefs:
- raise Exception(f'Layer "{name}" does not have any source receipe')
+ # pylint: disable-next=broad-exception-raised
+ raise Exception(f'Layer "{name}" does not have any source recipe')
for sourcedef in layerdefs.get('sources', []):
source = sourcedef.get('source', None)
@@ -93,6 +118,7 @@ def load_config(path=None, groupnames=None):
else:
urlp = urlparse(url)
if urlp is None:
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'urlparse({url}) failed')
cache = source.get('cache', None)
@@ -101,12 +127,14 @@ def load_config(path=None, groupnames=None):
else:
cache = cache.get('path', None)
- if cache is None or cache in ['', os.curdir, os.pardir] or cache.endswith(os.sep):
+ if cache is None or cache in ['', os_curdir, os_pardir] or cache.endswith(os_sep):
# infer filename from the source URL
if urlp is None or urlp.path is None or urlp.path == '' or urlp.path.endswith('/'):
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Layer "{name}": Could not infer filename from URL {url}')
p = PosixPath(urlp.path)
if p is None or p.name is None or p.name == '':
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Invalid PosixPath({urlp.path})')
if cache is None or cache == '':
cache = Path()
@@ -120,6 +148,7 @@ def load_config(path=None, groupnames=None):
v = { 'url': urlp, 'module': dl_module }
if cache in destinations and destinations[cache] != v:
# allow destination conflicts, but only when the source URL and module match
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Destination conflict for layer "{name}"')
destinations[cache] = v
@@ -134,7 +163,7 @@ def load_config(path=None, groupnames=None):
layernames.append(groupname)
else:
logging.error('Unknown group/layer name "%s"', groupname)
- exit(1)
+ sys.exit(1)
else:
patterns = layer_groups[groupname]
if isinstance(patterns, str):
@@ -168,65 +197,72 @@ def load_config(path=None, groupnames=None):
if isinstance(extent, list):
config['extent'] = tuple(extent)
if config.get('SRS', None) is None:
+ # pylint: disable-next=broad-exception-raised
raise Exception('Configured extent without SRS')
- sys.modules[__name__].config = config
+ return config
+
+def format_bytes(n : int, threshold : int = 768, precision : int = 2) -> str:
+ """Format a number of bytes to a SI unit"""
-def format_bytes(n):
- if n < 768:
+ if n < threshold:
return f'{n}\u202FB'
- elif n < 768*1024:
- return f'{n/1024:.2f}\u202FkiB'
- elif n < 768*1024*1024:
- return f'{n/1048576:.2f}\u202FMiB'
- else:
- return f'{n/1073741824:.2f}\u202FGiB'
+ if n < threshold * 1024:
+ return f'{n/1024:.{precision}f}\u202FkiB'
+ if n < threshold * 1048576:
+ return f'{n/1048576:.{precision}f}\u202FMiB'
+ return f'{n/1073741824:.{precision}f}\u202FGiB'
def format_time(ts : float, precision : int = 3) -> str:
+ """Format a timestamp to HH:MM:SS.fff"""
+
w = 2 if precision == 0 else precision + 3
ts = round(ts, precision)
- m = floor(ts/60.)
+ m = math.floor(ts/60.)
s = ts - 60. * m
h, m = divmod(m, 60)
return f'{h:02d}:{m:02d}:{s:0{w}.{precision}f}'
-# Return a boolean indicating whether the installer GDAL version is
-# greater than or equal to the provider (maj, min, rev) triplet.
-def gdalVersionMin(maj=0, min=0, rev=0):
+# pylint: disable-next=invalid-name, redefined-builtin
+def gdalVersionMin(gdal, maj : int = 0, min : int = 0, rev : int = 0) -> bool:
+ """Return a boolean indicating whether the installer GDAL version is
+ greater than or equal to the provider (maj, min, rev) triplet."""
+
if maj < 1 or (maj == 1 and min < 10):
# GDAL_VERSION_NUM() macro was changed in 1.10. That version
# was released in 2013 so we blindly assume the installer
# version is more recent
return True
- from osgeo import gdal
- gdal.UseExceptions()
-
- version_cur = int(gdal.VersionInfo());
+ version_cur = int(gdal.VersionInfo())
# cf. GDAL_COMPUTE_VERSION(maj,min,rev) in gcore/gdal_version.h.in
version_min = maj*1000000 + min*10000 + rev*100
return version_min <= version_cur
-# Wrapper around gdal.MajorObject.GetMetadataItem(name)
-def gdalGetMetadataItem(o, k):
- v = o.GetMetadataItem(k)
+# pylint: disable-next=invalid-name
+def gdalGetMetadataItem(obj, k : str) -> bool:
+ """Wrapper around gdal.MajorObject.GetMetadataItem(name)."""
+
+ v = obj.GetMetadataItem(k)
if v is not None and isinstance(v, str):
return v.upper() == 'YES'
- else:
- return False
-# Escape the given identifier, cf.
-# swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id()
-def escapeIdentifier(identifier):
+ return False
+
+def escape_identifier(identifier : str) -> str:
+ """Escape the given identifier, cf.
+ swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id()."""
+
if '\x00' in identifier:
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Invalid identifier "{identifier}"')
+
# SQL:1999 delimited identifier
return '"' + identifier.replace('"', '""') + '"'
-# Return a pair kwargs and driver to use with gdal.OpenEx()
-def gdalSetOpenExArgs(option_dict, flags=0):
- from osgeo import gdal
- gdal.UseExceptions()
+# pylint: disable-next=invalid-name,dangerous-default-value
+def gdalSetOpenExArgs(gdal, option_dict : Optional[dict[str, Any]] = {}, flags : int = 0):
+ """Return a pair kwargs and driver to use with gdal.OpenEx()."""
kwargs = { 'nOpenFlags': gdal.OF_VECTOR | flags }
@@ -236,8 +272,10 @@ def gdalSetOpenExArgs(option_dict, flags=0):
else:
drv = gdal.GetDriverByName(fmt)
if drv is None:
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Unknown driver name "{fmt}"')
- elif not gdalGetMetadataItem(drv, gdal.DCAP_VECTOR):
+ if not gdalGetMetadataItem(drv, gdal.DCAP_VECTOR):
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Driver "{drv.ShortName}" has no vector capabilities')
kwargs['allowed_drivers'] = [ drv.ShortName ]
@@ -246,41 +284,46 @@ def gdalSetOpenExArgs(option_dict, flags=0):
kwargs['open_options'] = [ k + '=' + str(v) for k, v in oo.items() ]
return kwargs, drv
-# Return the decoded Spatial Reference System
-def getSRS(srs_str):
- if srs_str is None:
- return
+# pylint: disable-next=invalid-name
+def getSRS(osr, srs_str : Optional[str]):
+ """Return the decoded Spatial Reference System."""
- from osgeo import osr
- osr.UseExceptions()
+ if srs_str is None:
+ return None
srs = osr.SpatialReference()
if srs_str.startswith('EPSG:'):
code = int(srs_str.removeprefix('EPSG:'))
srs.ImportFromEPSG(code)
else:
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Unknown SRS {srs_str}')
+
logging.debug('Default SRS: "%s" (%s)', srs.ExportToProj4(), srs.GetName())
return srs
-# Convert extent [minX, minY, maxX, maxY] into a polygon and assign the
-# given SRS. Return a pair with the densified and non-densified extent.
-# Like apps/ogr2ogr_lib.cpp, the former is obtained by segmentizing the
-# polygon to make sure it is sufficiently densified when transforming to
-# source layer SRS for spatial filtering.
-def getExtent(extent, srs=None):
+# pylint: disable-next=invalid-name
+def getExtent(extent : tuple[float, float, float, float] | list[float], srs = None):
+ """Convert extent [minX, minY, maxX, maxY] into a polygon and assign the
+ given SRS. Return a pair with the densified and non-densified extent.
+ Like apps/ogr2ogr_lib.cpp, the former is obtained by segmentizing the
+ polygon to make sure it is sufficiently densified when transforming to
+ source layer SRS for spatial filtering."""
+
if extent is None:
return None, None
- if not (isinstance(extent, list) or isinstance(extent, tuple)) or len(extent) != 4:
+ if not isinstance(extent, (list, tuple)) or len(extent) != 4:
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Invalid extent {extent}')
- elif srs is None:
+ if srs is None:
+ # pylint: disable-next=broad-exception-raised
raise Exception('Configured extent but no SRS')
logging.debug('Configured extent in %s: %s',
srs.GetName(), ', '.join(map(str, extent)))
- from osgeo import ogr, osr
+ from osgeo import ogr, osr # pylint: disable=import-outside-toplevel
ogr.UseExceptions()
ring = ogr.Geometry(ogr.wkbLinearRing)
@@ -306,9 +349,11 @@ def getExtent(extent, srs=None):
polygon_dense = polygon.Clone()
segment_distance_metre = 10 * 1000
if srs.IsGeographic():
+ # pylint: disable-next=invalid-name
dfMaxLength = segment_distance_metre / math.radians(srs.GetSemiMajor())
polygon_dense.Segmentize(dfMaxLength)
elif srs.IsProjected():
+ # pylint: disable-next=invalid-name
dfMaxLength = segment_distance_metre / srs.GetLinearUnits()
polygon_dense.Segmentize(dfMaxLength)
@@ -320,43 +365,42 @@ def getExtent(extent, srs=None):
# and augmented with dir_fd.
# An alternative would be to use str(Path(f'/proc/self/fd/{dir_fd}').joinpath(path)).
-# Is a path a directory?
-# (From genericpath.py.)
-def isdir(path, dir_fd=None, follow_symlinks=True):
+def isdir(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool:
+ """Is a path a directory? (From genericpath.py.)"""
try:
st = os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks)
except (OSError, ValueError):
return False
return S_ISDIR(st.st_mode)
-# Does a path exist?
-# (From genericpath.py.)
-def exists(path, dir_fd=None, follow_symlinks=True):
+def exists(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool:
+ """Does a path exist? (From genericpath.py.)"""
try:
os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks)
except (OSError, ValueError):
return False
return True
-# Create a leaf directory and all intermediate ones.
-# (From os.py.)
-def makedirs(name, mode=0o777, exist_ok=False, dir_fd=None, logging=None):
- head, tail = path.split(name)
+def makedirs(name : str, mode : int = 0o777,
+ exist_ok : bool = False,
+ dir_fd : Optional[int] = None) -> None:
+ """Create a leaf directory and all intermediate ones. (From os.py.)"""
+
+ head, tail = os_path.split(name)
if not tail:
- head, tail = path.split(head)
+ head, tail = os_path.split(head)
if head and tail and not exists(head, dir_fd=dir_fd):
try:
- makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd, logging=logging)
+ makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd)
except FileExistsError:
# Defeats race condition when another thread created the path
pass
- cdir = os.curdir
+ cdir = os_curdir
if isinstance(tail, bytes):
- cdir = bytes(os.curdir, 'ASCII')
+ cdir = bytes(os_curdir, 'ASCII')
if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists
return
- if logging is not None:
- logging.debug('mkdir("%s", 0%o)', name, mode)
+ logging.debug('mkdir("%s", 0%o)', name, mode)
try:
os.mkdir(name, mode, dir_fd=dir_fd)
except OSError: