aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.pylintrc7
-rwxr-xr-xadministrative-codes/csv2json9
-rwxr-xr-xadministrative-codes/update21
-rw-r--r--common.py210
-rwxr-xr-xwebmap-download88
-rw-r--r--webmap-download-mrr.py2
-rwxr-xr-xwebmap-import30
-rwxr-xr-xwebmap-publish28
8 files changed, 240 insertions, 155 deletions
diff --git a/.pylintrc b/.pylintrc
new file mode 100644
index 0000000..54b0100
--- /dev/null
+++ b/.pylintrc
@@ -0,0 +1,7 @@
+[DESIGN]
+max-args = 20
+max-positional-arguments = 20
+max-locals = 50
+max-branches = 25
+max-statements = 100
+max-nested-blocks = 10
diff --git a/administrative-codes/csv2json b/administrative-codes/csv2json
index 7c22666..6dd6ad7 100755
--- a/administrative-codes/csv2json
+++ b/administrative-codes/csv2json
@@ -18,6 +18,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#----------------------------------------------------------------------
+# pylint: disable=missing-module-docstring
+
import sys
import csv
from pathlib import Path
@@ -25,13 +27,14 @@ import json
basedir = Path(sys.argv[0]).parent
data = {}
-def readCSV(path):
- with open(path, mode='r', newline='') as fp:
+def readCSV(pathname): # pylint: disable=invalid-name
+ """Read CSV"""
+ with open(pathname, mode='r', newline='', encoding='utf-8') as fp:
reader = csv.DictReader(fp, delimiter='\t', dialect='unix')
for row in reader:
code = row['Code']
if code in data:
- raise Exception(f'Duplicate code {code}')
+ raise RuntimeError(f'Duplicate code {code}')
data[code] = row['Name']
# The source (SCB) lists all codes in same file: they differ only in
diff --git a/administrative-codes/update b/administrative-codes/update
index 855f73b..fa6a6da 100755
--- a/administrative-codes/update
+++ b/administrative-codes/update
@@ -18,6 +18,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#----------------------------------------------------------------------
+# pylint: disable=missing-module-docstring
+
import re
import sys
import csv
@@ -30,13 +32,15 @@ import xlrd
#
# Unfortunately SCB doesn't provide a CSV, so we download their xls file and produce our own.
# https://www.scb.se/hitta-statistik/regional-statistik-och-kartor/regionala-indelningar/lan-och-kommuner/lan-och-kommuner-i-kodnummerordning/
-r = requests.get('https://www.scb.se/contentassets/7a89e48960f741e08918e489ea36354a/kommunlankod_2024.xls')
+r = requests.get('https://www.scb.se/contentassets/7a89e48960f741e08918e489ea36354a/'
+ 'kommunlankod_2024.xls',
+ timeout=30)
r.raise_for_status()
if 'content-type' not in r.headers:
- raise Exception('Missing Content-Type from response headers')
-if r.headers['content-type'] not in ['application/vnd.ms-excel', 'application/octet-stream']:
- raise Exception(f"Unsupported Content-Type: {r.headers['content-type']}")
+ raise RuntimeError('Missing Content-Type from response headers')
+if r.headers['content-type'] not in ('application/vnd.ms-excel', 'application/octet-stream'):
+ raise RuntimeError(f"Unsupported Content-Type: {r.headers['content-type']}")
xls = xlrd.open_workbook(file_contents=r.content)
sheets = xls.sheet_names()
@@ -71,15 +75,16 @@ for i in range(sheet.nrows):
counties.append(row)
basedir = Path(sys.argv[0]).parent
-def writeCSV(filename, data):
+def writeCSV(filename, data): # pylint: disable=invalid-name
+ """Write CSV file."""
fieldnames = ['Code', 'Name']
path = basedir.joinpath(filename).with_suffix('.csv')
- with path.open(mode='w', newline='') as fp:
+ with path.open(mode='w', newline='', encoding='utf-8') as fp:
writer = csv.DictWriter(fp, fieldnames=fieldnames, delimiter='\t',
quoting=csv.QUOTE_MINIMAL, dialect='unix')
writer.writeheader()
- for row in data:
- writer.writerow(row)
+ for row2 in data:
+ writer.writerow(row2)
print(f'Wrote {len(data)} rows in {path}', file=sys.stderr)
writeCSV('counties', counties)
diff --git a/common.py b/common.py
index f0ddaa4..da2927f 100644
--- a/common.py
+++ b/common.py
@@ -16,18 +16,25 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#----------------------------------------------------------------------
-import os, sys
-from os import path
+# pylint: disable=missing-module-docstring
+
+import os
+from os import path as os_path, curdir as os_curdir, pardir as os_pardir, sep as os_sep
+import sys
from fnmatch import fnmatchcase
from pathlib import Path, PosixPath
-from urllib.parse import urlparse, urlunparse
+from urllib.parse import urlparse
from stat import S_ISDIR
-from math import floor
-from xdg.BaseDirectory import xdg_config_home
+import math
import logging
+from typing import Any, Optional, Never
+
+from xdg.BaseDirectory import xdg_config_home
import yaml
-def init_logger(app=__file__, level=logging.WARNING):
+def init_logger(app : str =__file__, level : int = logging.WARNING) -> logging.Logger:
+ """Initialize the logger"""
+
log_fmt = logging.Formatter('%(levelname)s: %(message)s')
log = logging.getLogger()
log.setLevel(level)
@@ -38,25 +45,42 @@ def init_logger(app=__file__, level=logging.WARNING):
ch = logging.StreamHandler()
else:
# started in systemd, use journald for filtering incl. coloring
- from systemd.journal import JournalHandler
+ from systemd.journal import JournalHandler # pylint: disable=import-outside-toplevel
ch = JournalHandler(SYSLOG_IDENTIFIER=app)
ch.setFormatter(log_fmt)
log.addHandler(ch)
return log
-def load_config(path=None, groupnames=None):
- if path is None:
- for p in [Path(),
- Path(xdg_config_home).joinpath('webmap'),
- PosixPath('/etc').joinpath('webmap')]:
- p = str(p.joinpath('config.yml'))
- if os.path.exists(p):
- path = p
- break
- if path is None:
- raise Exception('Could not find configuration file')
- with open(path, 'r') as fp:
+class MissingConfiguration(Exception):
+ """Exception raised when no configuration file could be found"""
+ def __init__(self, name : str) -> Never:
+ super().__init__(f'Could not find configuration file {name}')
+
+def find_config(filename : str = 'config.yml', appname : str = 'webmap') -> Path:
+ """Return the configuration file path"""
+ dirs = [
+ Path(),
+ Path(xdg_config_home).joinpath(appname),
+ PosixPath('/etc').joinpath(appname)
+ ]
+ for d in dirs:
+ p = d.joinpath(filename)
+ if p.exists():
+ return p
+ raise MissingConfiguration(filename)
+
+class BadConfiguration(Exception):
+ """Exception raised when there is a bad configuration"""
+ def __init__(self, config_path : Path, message : str) -> Never:
+ super().__init__(str(config_path) + ': ' + message)
+
+def parse_config(path : Optional[Path] = None,
+ groupnames : Optional[list[str]] = None) -> dict[str, Any]:
+ """Parse configuration file"""
+
+ config_path = find_config() if path is None else path
+ with config_path.open(mode='r', encoding='utf-8') as fp:
config = yaml.safe_load(fp)
layers = config.get('layers', {})
@@ -71,7 +95,8 @@ def load_config(path=None, groupnames=None):
layerdefs = layers[name]
if 'sources' not in layerdefs:
- raise Exception(f'Layer "{name}" does not have any source receipe')
+ # pylint: disable-next=broad-exception-raised
+ raise Exception(f'Layer "{name}" does not have any source recipe')
for sourcedef in layerdefs.get('sources', []):
source = sourcedef.get('source', None)
@@ -93,6 +118,7 @@ def load_config(path=None, groupnames=None):
else:
urlp = urlparse(url)
if urlp is None:
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'urlparse({url}) failed')
cache = source.get('cache', None)
@@ -101,12 +127,14 @@ def load_config(path=None, groupnames=None):
else:
cache = cache.get('path', None)
- if cache is None or cache in ['', os.curdir, os.pardir] or cache.endswith(os.sep):
+ if cache is None or cache in ['', os_curdir, os_pardir] or cache.endswith(os_sep):
# infer filename from the source URL
if urlp is None or urlp.path is None or urlp.path == '' or urlp.path.endswith('/'):
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Layer "{name}": Could not infer filename from URL {url}')
p = PosixPath(urlp.path)
if p is None or p.name is None or p.name == '':
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Invalid PosixPath({urlp.path})')
if cache is None or cache == '':
cache = Path()
@@ -120,6 +148,7 @@ def load_config(path=None, groupnames=None):
v = { 'url': urlp, 'module': dl_module }
if cache in destinations and destinations[cache] != v:
# allow destination conflicts, but only when the source URL and module match
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Destination conflict for layer "{name}"')
destinations[cache] = v
@@ -134,7 +163,7 @@ def load_config(path=None, groupnames=None):
layernames.append(groupname)
else:
logging.error('Unknown group/layer name "%s"', groupname)
- exit(1)
+ sys.exit(1)
else:
patterns = layer_groups[groupname]
if isinstance(patterns, str):
@@ -168,65 +197,72 @@ def load_config(path=None, groupnames=None):
if isinstance(extent, list):
config['extent'] = tuple(extent)
if config.get('SRS', None) is None:
+ # pylint: disable-next=broad-exception-raised
raise Exception('Configured extent without SRS')
- sys.modules[__name__].config = config
+ return config
+
+def format_bytes(n : int, threshold : int = 768, precision : int = 2) -> str:
+ """Format a number of bytes to a SI unit"""
-def format_bytes(n):
- if n < 768:
+ if n < threshold:
return f'{n}\u202FB'
- elif n < 768*1024:
- return f'{n/1024:.2f}\u202FkiB'
- elif n < 768*1024*1024:
- return f'{n/1048576:.2f}\u202FMiB'
- else:
- return f'{n/1073741824:.2f}\u202FGiB'
+ if n < threshold * 1024:
+ return f'{n/1024:.{precision}f}\u202FkiB'
+ if n < threshold * 1048576:
+ return f'{n/1048576:.{precision}f}\u202FMiB'
+ return f'{n/1073741824:.{precision}f}\u202FGiB'
def format_time(ts : float, precision : int = 3) -> str:
+ """Format a timestamp to HH:MM:SS.fff"""
+
w = 2 if precision == 0 else precision + 3
ts = round(ts, precision)
- m = floor(ts/60.)
+ m = math.floor(ts/60.)
s = ts - 60. * m
h, m = divmod(m, 60)
return f'{h:02d}:{m:02d}:{s:0{w}.{precision}f}'
-# Return a boolean indicating whether the installer GDAL version is
-# greater than or equal to the provider (maj, min, rev) triplet.
-def gdalVersionMin(maj=0, min=0, rev=0):
+# pylint: disable-next=invalid-name, redefined-builtin
+def gdalVersionMin(gdal, maj : int = 0, min : int = 0, rev : int = 0) -> bool:
+ """Return a boolean indicating whether the installer GDAL version is
+ greater than or equal to the provider (maj, min, rev) triplet."""
+
if maj < 1 or (maj == 1 and min < 10):
# GDAL_VERSION_NUM() macro was changed in 1.10. That version
# was released in 2013 so we blindly assume the installer
# version is more recent
return True
- from osgeo import gdal
- gdal.UseExceptions()
-
- version_cur = int(gdal.VersionInfo());
+ version_cur = int(gdal.VersionInfo())
# cf. GDAL_COMPUTE_VERSION(maj,min,rev) in gcore/gdal_version.h.in
version_min = maj*1000000 + min*10000 + rev*100
return version_min <= version_cur
-# Wrapper around gdal.MajorObject.GetMetadataItem(name)
-def gdalGetMetadataItem(o, k):
- v = o.GetMetadataItem(k)
+# pylint: disable-next=invalid-name
+def gdalGetMetadataItem(obj, k : str) -> bool:
+ """Wrapper around gdal.MajorObject.GetMetadataItem(name)."""
+
+ v = obj.GetMetadataItem(k)
if v is not None and isinstance(v, str):
return v.upper() == 'YES'
- else:
- return False
-# Escape the given identifier, cf.
-# swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id()
-def escapeIdentifier(identifier):
+ return False
+
+def escape_identifier(identifier : str) -> str:
+ """Escape the given identifier, cf.
+ swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_id()."""
+
if '\x00' in identifier:
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Invalid identifier "{identifier}"')
+
# SQL:1999 delimited identifier
return '"' + identifier.replace('"', '""') + '"'
-# Return a pair kwargs and driver to use with gdal.OpenEx()
-def gdalSetOpenExArgs(option_dict, flags=0):
- from osgeo import gdal
- gdal.UseExceptions()
+# pylint: disable-next=invalid-name,dangerous-default-value
+def gdalSetOpenExArgs(gdal, option_dict : Optional[dict[str, Any]] = {}, flags : int = 0):
+ """Return a pair kwargs and driver to use with gdal.OpenEx()."""
kwargs = { 'nOpenFlags': gdal.OF_VECTOR | flags }
@@ -236,8 +272,10 @@ def gdalSetOpenExArgs(option_dict, flags=0):
else:
drv = gdal.GetDriverByName(fmt)
if drv is None:
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Unknown driver name "{fmt}"')
- elif not gdalGetMetadataItem(drv, gdal.DCAP_VECTOR):
+ if not gdalGetMetadataItem(drv, gdal.DCAP_VECTOR):
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Driver "{drv.ShortName}" has no vector capabilities')
kwargs['allowed_drivers'] = [ drv.ShortName ]
@@ -246,41 +284,46 @@ def gdalSetOpenExArgs(option_dict, flags=0):
kwargs['open_options'] = [ k + '=' + str(v) for k, v in oo.items() ]
return kwargs, drv
-# Return the decoded Spatial Reference System
-def getSRS(srs_str):
- if srs_str is None:
- return
+# pylint: disable-next=invalid-name
+def getSRS(osr, srs_str : Optional[str]):
+ """Return the decoded Spatial Reference System."""
- from osgeo import osr
- osr.UseExceptions()
+ if srs_str is None:
+ return None
srs = osr.SpatialReference()
if srs_str.startswith('EPSG:'):
code = int(srs_str.removeprefix('EPSG:'))
srs.ImportFromEPSG(code)
else:
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Unknown SRS {srs_str}')
+
logging.debug('Default SRS: "%s" (%s)', srs.ExportToProj4(), srs.GetName())
return srs
-# Convert extent [minX, minY, maxX, maxY] into a polygon and assign the
-# given SRS. Return a pair with the densified and non-densified extent.
-# Like apps/ogr2ogr_lib.cpp, the former is obtained by segmentizing the
-# polygon to make sure it is sufficiently densified when transforming to
-# source layer SRS for spatial filtering.
-def getExtent(extent, srs=None):
+# pylint: disable-next=invalid-name
+def getExtent(extent : tuple[float, float, float, float] | list[float], srs = None):
+ """Convert extent [minX, minY, maxX, maxY] into a polygon and assign the
+ given SRS. Return a pair with the densified and non-densified extent.
+ Like apps/ogr2ogr_lib.cpp, the former is obtained by segmentizing the
+ polygon to make sure it is sufficiently densified when transforming to
+ source layer SRS for spatial filtering."""
+
if extent is None:
return None, None
- if not (isinstance(extent, list) or isinstance(extent, tuple)) or len(extent) != 4:
+ if not isinstance(extent, (list, tuple)) or len(extent) != 4:
+ # pylint: disable-next=broad-exception-raised
raise Exception(f'Invalid extent {extent}')
- elif srs is None:
+ if srs is None:
+ # pylint: disable-next=broad-exception-raised
raise Exception('Configured extent but no SRS')
logging.debug('Configured extent in %s: %s',
srs.GetName(), ', '.join(map(str, extent)))
- from osgeo import ogr, osr
+ from osgeo import ogr, osr # pylint: disable=import-outside-toplevel
ogr.UseExceptions()
ring = ogr.Geometry(ogr.wkbLinearRing)
@@ -306,9 +349,11 @@ def getExtent(extent, srs=None):
polygon_dense = polygon.Clone()
segment_distance_metre = 10 * 1000
if srs.IsGeographic():
+ # pylint: disable-next=invalid-name
dfMaxLength = segment_distance_metre / math.radians(srs.GetSemiMajor())
polygon_dense.Segmentize(dfMaxLength)
elif srs.IsProjected():
+ # pylint: disable-next=invalid-name
dfMaxLength = segment_distance_metre / srs.GetLinearUnits()
polygon_dense.Segmentize(dfMaxLength)
@@ -320,43 +365,42 @@ def getExtent(extent, srs=None):
# and augmented with dir_fd.
# An alternative would be to use str(Path(f'/proc/self/fd/{dir_fd}').joinpath(path)).
-# Is a path a directory?
-# (From genericpath.py.)
-def isdir(path, dir_fd=None, follow_symlinks=True):
+def isdir(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool:
+ """Is a path a directory? (From genericpath.py.)"""
try:
st = os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks)
except (OSError, ValueError):
return False
return S_ISDIR(st.st_mode)
-# Does a path exist?
-# (From genericpath.py.)
-def exists(path, dir_fd=None, follow_symlinks=True):
+def exists(path : str, dir_fd : Optional[int] = None, follow_symlinks : bool = True) -> bool:
+ """Does a path exist? (From genericpath.py.)"""
try:
os.stat(path, dir_fd=dir_fd, follow_symlinks=follow_symlinks)
except (OSError, ValueError):
return False
return True
-# Create a leaf directory and all intermediate ones.
-# (From os.py.)
-def makedirs(name, mode=0o777, exist_ok=False, dir_fd=None, logging=None):
- head, tail = path.split(name)
+def makedirs(name : str, mode : int = 0o777,
+ exist_ok : bool = False,
+ dir_fd : Optional[int] = None) -> None:
+ """Create a leaf directory and all intermediate ones. (From os.py.)"""
+
+ head, tail = os_path.split(name)
if not tail:
- head, tail = path.split(head)
+ head, tail = os_path.split(head)
if head and tail and not exists(head, dir_fd=dir_fd):
try:
- makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd, logging=logging)
+ makedirs(head, exist_ok=exist_ok, dir_fd=dir_fd)
except FileExistsError:
# Defeats race condition when another thread created the path
pass
- cdir = os.curdir
+ cdir = os_curdir
if isinstance(tail, bytes):
- cdir = bytes(os.curdir, 'ASCII')
+ cdir = bytes(os_curdir, 'ASCII')
if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists
return
- if logging is not None:
- logging.debug('mkdir("%s", 0%o)', name, mode)
+ logging.debug('mkdir("%s", 0%o)', name, mode)
try:
os.mkdir(name, mode, dir_fd=dir_fd)
except OSError:
diff --git a/webmap-download b/webmap-download
index 917f178..2d31a19 100755
--- a/webmap-download
+++ b/webmap-download
@@ -18,8 +18,12 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#----------------------------------------------------------------------
+# pylint: disable=invalid-name,missing-module-docstring
+# pylint: enable=invalid-name
+
from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY, O_TMPFILE
-import os, sys
+import os
+import sys
from fcntl import flock, LOCK_EX
import logging
from time import time, monotonic as time_monotonic
@@ -28,34 +32,53 @@ import itertools
from pathlib import Path
from email.utils import parsedate_to_datetime, formatdate
from hashlib import sha256
+from typing import Any, Optional, NoReturn, Never
import requests
import common
-def download_trystream(url, **kwargs):
- max_tries = 10
- f = kwargs.pop('session', requests)
+def download_trystream(url : str, **kwargs) -> requests.Response:
+ """GET a url, trying a number of times. Return immediately after the
+ first chunk is received"""
+
+ max_retries = kwargs.pop('max_retries', 10)
+ f = kwargs.pop('session', None)
+ if f is None:
+ f = requests
for i in itertools.count(1):
try:
r = f.get(url, **kwargs, stream=True)
except (requests.Timeout, requests.ConnectionError):
- if i < max_tries:
+ if i < max_retries:
logging.error('timeout')
continue
raise
- else:
- r.raise_for_status()
- return r
-def download(url, dest, dir_fd=None, headers={}, session=requests, progress=None):
+ r.raise_for_status()
+ return r
+
+class DownloadTooLarge(Exception):
+ """Exception raised when a downloaded file exceeds max-size"""
+ def __init__(self, max_size : int) -> Never:
+ super().__init__(f'Payload exceeds max-size ({max_size})')
+
+# pylint: disable-next=dangerous-default-value
+def download(dest : str,
+ dl : Optional[dict[str, Any]],
+ dir_fd : Optional[int] = None,
+ headers : dict[str, str] = {},
+ session : Optional[requests.sessions.Session] = None,
+ progress = None) -> None:
+ """Process a single download recipe"""
+
url = None if dl is None else dl.get('url', None)
if url is None:
logging.error('%s has no source URL, ignoring', dest)
return
max_size = dl.get('max-size', 2**26) # 64MiB
logging.info('Downloading %s…', url)
- destPath = Path(dest)
- dest_tmp = str(destPath.with_stem(f'.{destPath.stem}.new'))
+ dest_path = Path(dest)
+ dest_tmp = str(dest_path.with_stem(f'.{dest_path.stem}.new'))
try:
# delete any leftover
os.unlink(dest_tmp, dir_fd=dir_fd)
@@ -64,7 +87,7 @@ def download(url, dest, dir_fd=None, headers={}, session=requests, progress=None
start = time_monotonic()
r = download_trystream(url, headers=headers, session=session, timeout=30)
- if r.status_code == requests.codes.not_modified:
+ if r.status_code == 304:
# XXX shouldn't we call os.utime(dest) to bump its ctime here?
# otherwise we'll make several queries and get multiple 304
# replies if the file is used by multiple layers
@@ -103,12 +126,12 @@ def download(url, dest, dir_fd=None, headers={}, session=requests, progress=None
pbar.update(chunk_size)
size += chunk_size
if max_size is not None and size > max_size:
- raise Exception(f'Payload exceeds max-size ({max_size})')
+ raise DownloadTooLarge(max_size)
fp.write(chunk)
r = None
if last_modified is not None:
- os.utime(fd, times=(last_modified, last_modified), follow_symlinks=True)
+ os.utime(fd, times=(last_modified, last_modified))
# XXX unfortunately there is no way for linkat() to clobber the destination,
# so we use a temporary file; it's racy, but thanks to O_TMPFILE better
@@ -129,10 +152,12 @@ def download(url, dest, dir_fd=None, headers={}, session=requests, progress=None
raise e
elapsed = time_monotonic() - start
- logging.info("%s: Downloaded %s in %s (%s/s)", dest, common.format_bytes(size),
- common.format_time(elapsed), common.format_bytes(int(size/elapsed)))
+ logging.info('%s: Downloaded %s in %s (%s/s)', dest,
+ common.format_bytes(size),
+ common.format_time(elapsed),
+ common.format_bytes(int(size/elapsed)))
-if __name__ == '__main__':
+def main() -> NoReturn: # pylint: disable=missing-function-docstring
common.init_logger(app=os.path.basename(__file__), level=logging.INFO)
parser = argparse.ArgumentParser(description='Download or update GIS layers.')
@@ -152,16 +177,16 @@ if __name__ == '__main__':
if args.debug > 0:
logging.getLogger().setLevel(logging.DEBUG)
if args.debug > 1:
- from http.client import HTTPConnection
+ from http.client import HTTPConnection # pylint: disable=import-outside-toplevel
HTTPConnection.debuglevel = 1
- requests_log = logging.getLogger("urllib3")
+ requests_log = logging.getLogger('urllib3')
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
- common.load_config(groupnames=None if args.groupname == [] else args.groupname)
+ config = common.parse_config(groupnames=None if args.groupname == [] else args.groupname)
sources = []
- for name, layerdefs in common.config.get('layers', {}).items():
+ for name, layerdefs in config.get('layers', {}).items():
for layerdef in layerdefs['sources']:
sourcedef = layerdef.get('source', {})
sourcedef['layername'] = name
@@ -170,7 +195,7 @@ if __name__ == '__main__':
if args.quiet or not sys.stderr.isatty():
pbar = None
else:
- from tqdm import tqdm
+ from tqdm import tqdm # pylint: disable=import-outside-toplevel
pbar = tqdm
# intentionally leave the dirfd open until the program terminates
@@ -178,7 +203,7 @@ if __name__ == '__main__':
destdir_fd = os.open(args.cachedir, opendir_args)
lockdir_fd = None if args.lockdir is None else os.open(args.lockdir, opendir_args)
- sessionRequests = requests.Session()
+ session_requests = requests.Session()
rv = 0
downloads = set()
@@ -202,21 +227,22 @@ if __name__ == '__main__':
continue
headers = {}
- user_agent = common.config.get('User-Agent', None)
+ user_agent = config.get('User-Agent', None)
if user_agent is not None:
headers['User-Agent'] = user_agent
try:
# create parent directories
destdir = os.path.dirname(dest)
- common.makedirs(destdir, mode=0o755, dir_fd=destdir_fd, exist_ok=True, logging=logging)
+ common.makedirs(destdir, mode=0o755, dir_fd=destdir_fd, exist_ok=True)
# place an exclusive lock on a lockfile as the destination can be used by other layers
# hence might be updated in parallel
if lockdir_fd is not None:
lockfile = sha256(dest.encode('utf-8')).hexdigest() + '.lck'
# use O_TRUNC to bump lockfile's mtime
- lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644, dir_fd=lockdir_fd)
+ lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644,
+ dir_fd=lockdir_fd)
try:
if lockdir_fd is not None:
logging.debug('flock("%s", LOCK_EX)', lockfile)
@@ -235,16 +261,18 @@ if __name__ == '__main__':
dest, common.format_time(s))
continue
headers['If-Modified-Since'] = formatdate(timeval=st.st_mtime, localtime=False, usegmt=True)
- fetch(dl, dest, dir_fd=destdir_fd,
- headers=headers, session=sessionRequests,
+ fetch(dest, dl, dir_fd=destdir_fd,
+ headers=headers, session=session_requests,
progress=pbar)
downloads.add(dest)
finally:
if lockdir_fd is not None:
os.close(lock_fd)
- except Exception:
+ except Exception: # pylint: disable=broad-exception-caught
logging.exception('Could not download %s as %s',
dl.get('url', source['layername']), dest)
if args.exit_code:
rv = 1
- exit(rv)
+ sys.exit(rv)
+
+main()
diff --git a/webmap-download-mrr.py b/webmap-download-mrr.py
index f839ac0..696e46c 100644
--- a/webmap-download-mrr.py
+++ b/webmap-download-mrr.py
@@ -538,7 +538,7 @@ PATH_RE = re.compile(r'^M\s*' +
POINT0_RE = re.compile(r'^M\s*(' + FLOAT_RE + r')\s+(' + FLOAT_RE + r')\s+')
POINT_RE = re.compile(r'^L\s*(' + FLOAT_RE + r')\s+(' + FLOAT_RE + r')(?:\s+|$)')
-def download(dl, dest, dir_fd=None, headers={}, session=requests, progress=None):
+def download(dest, dl, dir_fd=None, headers={}, session=requests, progress=None):
dest_path = Path(dest)
if dest_path.suffix.lower() != '.geojson':
# mostly to check that nothing ends in .svg
diff --git a/webmap-import b/webmap-import
index 9f9fdca..13bdcdc 100755
--- a/webmap-import
+++ b/webmap-import
@@ -46,13 +46,13 @@ import osgeo.gdalconst as gdalconst
gdal.UseExceptions()
import common
-from common import gdalSetOpenExArgs, gdalGetMetadataItem, gdalVersionMin, escapeIdentifier
+from common import gdalSetOpenExArgs, gdalGetMetadataItem, gdalVersionMin, escape_identifier
# Open and return the output DS. It is created if create=False or
# create-options is a non-empty dictionary.
def openOutputDS(def_dict):
path = def_dict['path']
- kwargs, drv = gdalSetOpenExArgs(def_dict, flags=GDAL_OF_UPDATE|GDAL_OF_VERBOSE_ERROR)
+ kwargs, drv = gdalSetOpenExArgs(gdal, def_dict, flags=GDAL_OF_UPDATE|GDAL_OF_VERBOSE_ERROR)
try:
logging.debug('OpenEx(%s, %s)', path, str(kwargs))
return gdal.OpenEx(path, **kwargs)
@@ -321,13 +321,13 @@ def setFieldIf(cond, attrName, val, data, fldName, drvName, log=logging.warning)
# constraints.)
def validateSchema(layers, drvo=None, lco_defaults=None):
# Cf. https://github.com/OSGeo/gdal/blob/master/NEWS.md
- if gdalVersionMin(maj=3, min=7):
+ if gdalVersionMin(gdal, maj=3, min=7):
# list of capability flags supported by the CreateField() API
drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_CREATION_FIELD_DEFN_FLAGS)
drvoFieldDefnFlags = drvoFieldDefnFlags.split(' ') if drvoFieldDefnFlags is not None else []
drvoSupportsFieldComment = 'Comment' in drvoFieldDefnFlags
# GetTZFlag()/SetTZFlag() and OGR_TZFLAG_* constants added in 3.8.0
- hasTZFlagSupport = gdalVersionMin(maj=3, min=8)
+ hasTZFlagSupport = gdalVersionMin(gdal, maj=3, min=8)
else:
# list of flags supported by the OGRLayer::AlterFieldDefn() API
drvoFieldDefnFlags = drvo.GetMetadataItem(gdalconst.DMD_ALTER_FIELD_DEFN_FLAGS)
@@ -726,7 +726,7 @@ def clearLayer(ds, lyr):
if n == 0:
# nothing to clear, we're good
return
- layername_esc = escapeIdentifier(lyr.GetName())
+ layername_esc = escape_identifier(lyr.GetName())
# XXX GDAL <3.9 doesn't have lyr.GetDataset() so we pass the DS along with the layer
drv = ds.GetDriver()
@@ -849,7 +849,7 @@ def setFieldMapValue(fld, idx, val):
# while we want a single transaction for the entire desination layer,
# including truncation, source imports, and metadata changes.
def importSource2(lyr_dst, path, args={}, basedir=None, extent=None):
- kwargs, _ = gdalSetOpenExArgs(args, flags=GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR)
+ kwargs, _ = gdalSetOpenExArgs(gdal, args, flags=GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR)
path2 = path if basedir is None else str(basedir.joinpath(path))
logging.debug('OpenEx(%s, %s)', path2, str(kwargs))
@@ -1121,13 +1121,13 @@ if __name__ == '__main__':
if args.debug > 1:
gdal.ConfigurePythonLogging(enable_debug=True)
- common.load_config(groupnames=None if args.groupname == [] else args.groupname)
+ config = common.get_config(groupnames=None if args.groupname == [] else args.groupname)
# validate configuration
- if 'dataset' not in common.config:
+ if 'dataset' not in config:
raise Exception('Configuration does not specify output dataset')
- layers = common.config.get('layers', {})
+ layers = config.get('layers', {})
for layername, layerdefs in layers.items():
for idx, layerdef in enumerate(layerdefs['sources']):
importdef = layerdef.get('import', None)
@@ -1140,7 +1140,7 @@ if __name__ == '__main__':
ds_srcpath = importdef.get('path', None)
if src is None and unar is None and ds_srcpath is not None:
- # fallback to importe:path if there is no unarchiving receipe
+ # fallback to importe:path if there is no unarchiving recipe
src = Path(ds_srcpath)
if unar is not None and ds_srcpath is None:
raise Exception(f'Output layer "{layername}" source #{idx} has no import source path')
@@ -1149,20 +1149,20 @@ if __name__ == '__main__':
layerdef['source'] = { 'path': src, 'unar': unar }
# set global GDAL/OGR configuration options
- for pszKey, pszValue in common.config.get('GDALconfig', {}).items():
+ for pszKey, pszValue in config.get('GDALconfig', {}).items():
logging.debug('gdal.SetConfigOption(%s, %s)', pszKey, pszValue)
gdal.SetConfigOption(pszKey, pszValue)
# open output dataset (possibly create it first)
- dso = openOutputDS(common.config['dataset'])
+ dso = openOutputDS(config['dataset'])
validateSchema(layers,
drvo=dso.GetDriver(),
- lco_defaults=common.config['dataset'].get('create-layer-options', None))
+ lco_defaults=config['dataset'].get('create-layer-options', None))
# get configured Spatial Reference System and extent
- srs = common.getSRS(common.config.get('SRS', None))
- extent = common.getExtent(common.config.get('extent', None), srs=srs)[0]
+ srs = common.getSRS(osr, config.get('SRS', None))
+ extent = common.getExtent(config.get('extent', None), srs=srs)[0]
if args.lockfile is not None:
# obtain an exclusive lock and don't release it until exiting the program
diff --git a/webmap-publish b/webmap-publish
index 13b432e..8db920b 100755
--- a/webmap-publish
+++ b/webmap-publish
@@ -45,9 +45,7 @@ import common
from common import (
format_bytes,
gdalSetOpenExArgs,
- gdalVersionMin,
- gdalGetMetadataItem,
- escapeIdentifier
+ escape_identifier
)
# Open and return source dataset.
@@ -58,7 +56,7 @@ def openSourceDS(def_dict):
if 'open-options' not in def_dict:
def_dict['open-options'] = {}
def_dict['open-options'] |= opts2
- kwargs, _ = gdalSetOpenExArgs(def_dict, flags=GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR)
+ kwargs, _ = gdalSetOpenExArgs(gdal, def_dict, flags=GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR)
path = def_dict['path']
logging.debug('OpenEx(%s, %s)', path, str(kwargs))
@@ -98,7 +96,7 @@ def createMVT(basedir, options=None):
'tiling-scheme': 'TILING_SCHEME',
'extent': 'EXTENT',
}
- defaults_options = common.config.get('vector-tiles', None)
+ defaults_options = config.get('vector-tiles', None)
if defaults_options is not None:
for k, v in defaults_options.items():
if k not in defaults_map:
@@ -152,7 +150,7 @@ def getSourceLayer(ds_src, layerdef, extent=None):
logging.warning('Source layer "%s" has %d != 1 geometry fields', lyr_src.GetName(), geomFieldCount)
geomField = defn.GetGeomFieldDefn(0)
geomType = geomField.GetType()
- geomFieldName_esc = escapeIdentifier(geomField.GetName())
+ geomFieldName_esc = escape_identifier(geomField.GetName())
# transform extent to source SRS
if extent is None or extent.GetSpatialReference().IsSame(srs_src):
@@ -179,7 +177,7 @@ def getSourceLayer(ds_src, layerdef, extent=None):
fldType = fld.GetType()
if fld.GetName().lower() in reserved_fields:
raise Exception(f'Invalid column name "{fld.GetName()}"')
- fldName_esc = escapeIdentifier(fld.GetName())
+ fldName_esc = escape_identifier(fld.GetName())
column = 'm.' + fldName_esc
# for date/time/datetime fields, we let the RDBMS engine do the formatting if possible
if fldType == ogr.OFTDate:
@@ -221,7 +219,7 @@ def getSourceLayer(ds_src, layerdef, extent=None):
else:
raise Exception(f'Unsupported geometry transformation: {transform_geometry}')
- query = 'SELECT ' + ', '.join(columns) + ' FROM ' + escapeIdentifier(lyr_src.GetName()) + ' m'
+ query = 'SELECT ' + ', '.join(columns) + ' FROM ' + escape_identifier(lyr_src.GetName()) + ' m'
# add WHERE clauses and spatial filter; for GPKG/SQlite3, the query
# is too complex and the driver can't use ExecuteSQL()'s spatialFilter
@@ -475,15 +473,15 @@ if __name__ == '__main__':
logging.error('Could not infer --name value')
exit(1)
- common.load_config(groupnames=None if args.groupname == [] else args.groupname)
+ config = common.get_config(groupnames=None if args.groupname == [] else args.groupname)
# validate configuration
- if 'dataset' not in common.config:
+ if 'dataset' not in config:
raise Exception('Configuration does not specify source dataset')
export_layers = {}
mvtconf = {}
- for layername, layerdef in common.config.get('layers', {}).items():
+ for layername, layerdef in config.get('layers', {}).items():
exportdef = layerdef.get('publish', None)
if exportdef is None:
raise Exception(f'Layer "{layername}" has no publication definition')
@@ -495,7 +493,7 @@ if __name__ == '__main__':
if export_layername in export_layers:
raise Exception(f'Duplicate definition for {export_layername}')
x = {}
- for k in ['target_name', 'minzoom', 'maxzoom']:
+ for k in ('target_name', 'minzoom', 'maxzoom'):
if k in export_layerdef:
x[k] = export_layerdef[k]
mvtconf[export_layername] = x
@@ -509,13 +507,13 @@ if __name__ == '__main__':
pass
# open source DS
- ds = openSourceDS(common.config['dataset'])
+ ds = openSourceDS(config['dataset'])
# get configured Spatial Reference System and extent; force traditional GIS order
# on the target SRS since it's what MVT and JSON export are expecting
- srs = common.getSRS(common.config.get('SRS', None))
+ srs = common.getSRS(osr, config.get('SRS', None))
srs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER)
- extent, extent_simple = common.getExtent(common.config.get('extent', None), srs=srs)
+ extent, extent_simple = common.getExtent(config.get('extent', None), srs=srs)
if srs.IsProjected():
# will export with centimeter precision