diff options
author | Guilhem Moulin <guilhem@fripost.org> | 2025-04-17 12:23:38 +0200 |
---|---|---|
committer | Guilhem Moulin <guilhem@fripost.org> | 2025-04-19 05:14:48 +0200 |
commit | 2abf2297aabb355b72c6ae9e0aaf350f7a6cbe9d (patch) | |
tree | dd1a157dc6e08a96fdb99d79a6cf2e43047f3650 /common.py | |
parent | 4bcf4d8a3229c89813cbf3c05f4ef14cc80202d9 (diff) |
Add type hints and refactor a bit to please pylint.
Diffstat (limited to 'common.py')
-rw-r--r-- | common.py | 210 |
1 files changed, 127 insertions, 83 deletions
@@ -16,18 +16,25 @@ # along with this program. If not, see <https://www.gnu.org/licenses/>. #---------------------------------------------------------------------- -import os, sys -from os import path +# pylint: disable=missing-module-docstring + +import os +from os import path as os_path, curdir as os_curdir, pardir as os_pardir, sep as os_sep +import sys from fnmatch import fnmatchcase from pathlib import Path, PosixPath -from urllib.parse import urlparse, urlunparse +from urllib.parse import urlparse from stat import S_ISDIR -from math import floor -from xdg.BaseDirectory import xdg_config_home +import math import logging +from typing import Any, Optional, Never + +from xdg.BaseDirectory import xdg_config_home import yaml -def init_logger(app=__file__, level=logging.WARNING): +def init_logger(app : str =__file__, level : int = logging.WARNING) -> logging.Logger: + """Initialize the logger""" + log_fmt = logging.Formatter('%(levelname)s: %(message)s') log = logging.getLogger() log.setLevel(level) @@ -38,25 +45,42 @@ def init_logger(app=__file__, level=logging.WARNING): ch = logging.StreamHandler() else: # started in systemd, use journald for filtering incl. coloring - from systemd.journal import JournalHandler + from systemd.journal import JournalHandler # pylint: disable=import-outside-toplevel ch = JournalHandler(SYSLOG_IDENTIFIER=app) ch.setFormatter(log_fmt) log.addHandler(ch) return log -def load_config(path=None, groupnames=None): - if path is None: - for p in [Path(), - Path(xdg_config_home).joinpath('webmap'), - PosixPath('/etc').joinpath('webmap')]: - p = str(p.joinpath('config.yml')) - if os.path.exists(p): - path = p - break - if path is None: - raise Exception('Could not find configuration file') - with open(path, 'r') as fp: +class MissingConfiguration(Exception): + """Exception raised when no configuration file could be found""" + def __init__(self, name : str) -> Never: + super().__init__(f'Could not find configuration file {name}') + +def find_config(filename : str = 'config.yml', appname : str = 'webmap') -> Path: + """Return the configuration file path""" + dirs = [ + Path(), + Path(xdg_config_home).joinpath(appname), + PosixPath('/etc').joinpath(appname) + ] + for d in dirs: + p = d.joinpath(filename) + if p.exists(): + return p + raise MissingConfiguration(filename) + +class BadConfiguration(Exception): + """Exception raised when there is a bad configuration""" + def __init__(self, config_path : Path, message : str) -> Never: + super().__init__(str(config_path) + ': ' + message) + +def parse_config(path : Optional[Path] = None, + groupnames : Optional[list[str]] = None) -> dict[str, Any]: + """Parse configuration file""" + + config_path = find_config() if path is None else path + with config_path.open(mode='r', encoding='utf-8') as fp: config = yaml.safe_load(fp) layers = config.get('layers', {}) @@ -71,7 +95,8 @@ def load_config(path=None, groupnames=None): layerdefs = layers[name] if 'sources' not in layerdefs: - raise Exception(f'Layer "{name}" does not have any source receipe') + # pylint: disable-next=broad-exception-raised + raise Exception(f'Layer "{name}" does not have any source recipe') for sourcedef in layerdefs.get('sources', []): source = sourcedef.get('source', None) @@ -93,6 +118,7 @@ def load_config(path=None, groupnames=None): else: urlp = urlparse(url) if urlp is None: + # pylint: disable-next=broad-exception-raised raise Exception(f'urlparse({url}) failed') cache = source.get('cache', None) @@ -101,12 +127,14 @@ def load_config(path=None, groupnames=None): else: cache = cache.get('path', None) - if cache is None or cache in ['', os.curdir, os.pardir] or cache.endswith(os.sep): + if cache is None or cache in ['', os_curdir, os_pardir] or cache.endswith(os_sep): # infer filename from the source URL if urlp is None or urlp.path is None or urlp.path == '' or urlp.path.endswith('/'): + # pylint: disable-next=broad-exception-raised raise Exception(f'Layer "{name}": Could not infer filename from URL {url}') p = PosixPath(urlp.path) if p is None or p.name is None or p.name == '': + # pylint: disable-next=broad-exception-raised raise Exception(f'Invalid PosixPath({urlp.path})') if cache is None or cache == '': cache = Path() @@ -120,6 +148,7 @@ def load_config(path=None, groupnames=None): v = { 'url': urlp, 'module': dl_module } if cache in destinations and destinations[cache] != v: # allow destination conflicts, but only when the source URL and module match + # pylint: disable-next=broad-exception-raised raise Exception(f'Destination conflict for layer "{name}"') destinations[cache] = v @@ -134,7 +163,7 @@ def load_config(path=None, groupnames=None): layernames.append(groupname) else: logging.error('Unknown group/layer name "%s"', groupname) - exit(1) + sys.exit(1) else: patterns = layer_groups[groupname] if isinstance(patterns, str): @@ -168,65 +197,72 @@ def load_config(path=None, groupnames=None): if isinstance(extent, list): config['extent'] = tuple(extent) if config.get('SRS', None) is None: + # pylint: disable-next=broad-exception-raised raise Exception('Configured extent without SRS') - sys.modules[__name__].config = config + return config + +def format_bytes(n : int, threshold : int = 768, precision : int = 2) -> str: + """Format a number of bytes to a SI unit""" -def format_bytes(n): - if n < 768: + if n < threshold: return f'{n}\u202FB' - elif n < 768*1024: - return f'{n/1024:.2f}\u202FkiB' - elif n < 768*1024*1024: - return f'{n/1048576:.2f}\u202FMiB' - else: - return f'{n/1073741824:.2f}\u202FGiB' + if n < threshold * 1024: + return f'{n/1024:.{precision}f}\u202FkiB' + if n < threshold * 1048576: + return f'{n/1048576:.{precision}f}\u202FMiB' + return f'{n/1073741824:.{precision}f}\u202FGiB' def format_time(ts : float, precision : int = 3) -> str: + """Format a timestamp to HH:MM:SS.fff""" + w = 2 if precision == 0 else precision + 3 ts = round(ts, precision) - m = floor(ts/60.) + m = math.floor(ts/60.) s = ts - 60. * m h, m = divmod(m, 60) return f'{h:02d}:{m:02d}:{s:0{w}.{precision}f}' -# Return a boolean indicating whether the installer GDAL version is -# greater than or equal to the provider (maj, min, rev) triplet. -def gdalVersionMin(maj=0, min=0, rev=0): +# pylint: disable-next=invalid-name, redefined-builtin +def gdalVersionMin(gdal, maj : int = 0, min : int = 0, rev : int = 0) -> bool: + """Return a boolean indicating whether the installer GDAL version is + greater than or equal to the provider (maj, min, rev) triplet.""" + if maj < 1 or (maj == 1 and min < 10): # GDAL_VERSION_NUM() macro was changed in 1.10. That version # was released in 2013 so we blindly assume the installer # version is more recent return True - from osgeo import gdal - gdal.UseExceptions() - - version_cur = int(gdal.VersionInfo()); + version_cur = int(gdal.VersionInfo()) # cf. GDAL_COMPUTE_VERSION(maj,min,rev) in gcore/gdal_version.h.in version_min = maj*1000000 + min*10000 + rev*100 return version_min <= version_cur -# Wrapper around gdal.MajorObject.GetMetadataItem(name) -def gdalGetMetadataItem(o, k): - v = o.GetMetadataItem(k) +# pylint: disable-next=invalid-name +def gdalGetMetadataItem(obj, k : str) -> bool: + """Wrapper around gdal.MajorObject.GetMetadataItem(name).""" + + v = obj.GetMetadataItem(k) if v is not None and isinstance(v, str): return v.upper() == 'YES' - else: - return False -# Escape the given identifier, cf. -# swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id() -def escapeIdentifier(identifier): + return False + +def escape_identifier(identifier : str) -> str: + """Escape the given identifier, cf. + swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id().""" + if '\x00' in identifier: + # pylint: disable-next=broad-exception-raised raise Exception(f'Invalid identifier "{identifier}"') + # SQL:1999 delimited identifier return '"' + identifier.replace('"', '""') + '"' -# Return a pair kwargs and driver to use with gdal.OpenEx() -def gdalSetOpenExArgs(option_dict, flags=0): - from osgeo import gdal - gdal.UseExceptions() +# pylint: disable-next=invalid-name,dangerous-default-value +def gdalSetOpenExArgs(gdal, option_dict : Optional[dict[str, Any]] = {}, flags : int = 0): + """Return a pair kwargs and driver to use with gdal.OpenEx().""" kwargs = { 'nOpenFlags': gdal.OF_VECTOR | flags } @@ -236,8 +272,10 @@ def gdalSetOpenExArgs(option_dict, flags=0): else: drv = gdal.GetDriverByName(fmt) if drv is None: + # pylint: disable-next=broad-exception-raised raise Exception(f'Unknown driver name "{fmt}"') - elif not gdalGetMetadataItem(drv, gdal.DCAP_VECTOR): + if not gdalGetMetadataItem(drv, gdal.DCAP_VECTOR): + # pylint: disable-next=broad-exception-raised raise Exception(f'Driver "{drv.ShortName}" has no vector capabilities') kwargs['allowed_drivers'] = [ drv.ShortName ] @@ -246,41 +284,46 @@ def gdalSetOpenExArgs(option_dict, flags=0): kwargs['open_options'] = [ k + '=' + str(v) for k, v in oo.items() ] return kwargs, drv -# Return the decoded Spatial Reference System -def getSRS(srs_str): - if srs_str is None: - return +# pylint: disable-next=invalid-name +def getSRS(osr, srs_str : Optional[str]): + """Return the decoded Spatial Reference System.""" - from osgeo import osr - osr.UseExceptions() + if srs_str is None: + return None srs = osr.SpatialReference() if srs_str.startswith('EPSG:'): code = int(srs_str.removeprefix('EPSG:')) srs.ImportFromEPSG(code) else: + # pylint: disable-next=broad-exception-raised raise Exception(f'Unknown SRS {srs_str}') + logging.debug('Default SRS: "%s" (%s)', srs.ExportToProj4(), srs.GetName()) return srs -# Convert extent [minX, minY, maxX, maxY] into a polygon and assign the -# given SRS. Return a pair with the densified and non-densified extent. -# Like apps/ogr2ogr_lib.cpp, the former is obtained by segmentizing the -# polygon to make sure it is sufficiently densified when transforming to -# source layer SRS for spatial filtering. -def getExtent(extent, srs=None): +# pylint: disable-next=invalid-name +def getExtent(extent : tuple[float, float, float, float] | list[float], srs = None): + """Convert extent [minX, minY, maxX, maxY] into a polygon and assign the + given SRS. Return a pair with the densified and non-densified extent. + Like apps/ogr2ogr_lib.cpp, the former is obtained by segmentizing the + polygon to make sure it is sufficiently densified when transforming to + source layer SRS for spatial filtering.""" + if extent is None: return None, None - if not (isinstance(extent, list) or isinstance(extent, tuple)) or len(extent) != 4: + if not isinstance(extent, (list, tuple)) or len(extent) != 4: + # pylint: disable-next=broad-exception-raised raise Exception(f'Invalid extent {extent}') - elif srs is None: + if srs is None: + # pylint: disable-next=broad-exception-raised raise Exception('Configured extent but no SRS') logging.debug('Configured extent in %s: %s', srs.GetName(), ', '.join(map(str, extent))) - from osgeo import ogr, osr + from osgeo import ogr, osr # pylint: disable=import-outside-toplevel ogr.UseExceptions() ring = ogr.Geometry(ogr.wkbLinearRing) @@ -306,9 +349,11 @@ def getExtent(extent, srs=None): polygon_dense = polygon.Clone() segment_distance_metre = 10 * 1000 if srs.IsGeographic(): + # pylint: disable-next=invalid-name dfMaxLength = segment_distance_metre / math.radians(srs.GetSemiMajor()) polygon_dense.Segmentize(dfMaxLength) elif srs.IsProjected(): + # pylint: disable-next=invalid-name dfMaxLength = segment_distance_metre / srs.GetLinearUnits() polygon_dense.Segmentize(dfMaxLength) @@ -320,43 +365,42 @@ def getExtent(extent, srs=None): # and augmented with dir_fd. # An alternative would be to use str(Path(f'/proc/self/fd/{dir_fd}').joinpath(path)). -# Is a path a directory? -# (From genericpath.py.) -def isdir(path, dir_fd=None, follow_symlinks=True): +def isdir(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool: + """Is a path a directory? (From genericpath.py.)""" try: st = os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return S_ISDIR(st.st_mode) -# Does a path exist? -# (From genericpath.py.) -def exists(path, dir_fd=None, follow_symlinks=True): +def exists(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool: + """Does a path exist? (From genericpath.py.)""" try: os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks) except (OSError, ValueError): return False return True -# Create a leaf directory and all intermediate ones. -# (From os.py.) -def makedirs(name, mode=0o777, exist_ok=False, dir_fd=None, logging=None): - head, tail = path.split(name) +def makedirs(name : str, mode : int = 0o777, + exist_ok : bool = False, + dir_fd : Optional[int] = None) -> None: + """Create a leaf directory and all intermediate ones. (From os.py.)""" + + head, tail = os_path.split(name) if not tail: - head, tail = path.split(head) + head, tail = os_path.split(head) if head and tail and not exists(head, dir_fd=dir_fd): try: - makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd, logging=logging) + makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd) except FileExistsError: # Defeats race condition when another thread created the path pass - cdir = os.curdir + cdir = os_curdir if isinstance(tail, bytes): - cdir = bytes(os.curdir, 'ASCII') + cdir = bytes(os_curdir, 'ASCII') if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists return - if logging is not None: - logging.debug('mkdir("%s", 0%o)', name, mode) + logging.debug('mkdir("%s", 0%o)', name, mode) try: os.mkdir(name, mode, dir_fd=dir_fd) except OSError: |