#!/usr/bin/python3 #---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (download common layers) # Copyright © 2024-2025 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- # pylint: disable=invalid-name, missing-module-docstring, fixme # pylint: enable=invalid-name from os import ( O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY, O_TMPFILE, path as os_path, curdir as os_curdir, pardir as os_pardir, sep as os_sep ) import os import sys from fcntl import flock, LOCK_EX import logging from time import time, monotonic as time_monotonic import argparse import itertools from pathlib import Path from email.utils import parsedate_to_datetime, formatdate from typing import Optional, NoReturn, Never import requests import common from common import BadConfiguration, getSourcePathLockFileName def download_trystream(url : str, **kwargs) -> requests.Response: """GET a url, trying a number of times. Return immediately after the first chunk is received""" max_retries = kwargs.pop('max_retries', 10) f = kwargs.pop('session', None) if f is None: f = requests for i in itertools.count(1): try: r = f.get(url, **kwargs, stream=True) except (requests.Timeout, requests.ConnectionError): if i < max_retries: logging.error('timeout') continue raise r.raise_for_status() return r class DownloadTooLarge(Exception): """Exception raised when a downloaded file exceeds max-size""" def __init__(self, max_size : int) -> Never: super().__init__(f'Payload exceeds max-size ({max_size})') # pylint: disable-next=dangerous-default-value def download(dest : str, dl : dict[str, dict[str, str|int]], dir_fd : Optional[int] = None, headers : dict[str, str] = {}, session : Optional[requests.sessions.Session] = None, progress = None) -> None: """Process a single download recipe""" url = None if dl is None else dl.get('url', None) if url is None: logging.error('%s has no source URL, ignoring', dest) return max_size = dl.get('max-size', 2**26) # 64MiB logging.info('Downloading %s…', url) dest_path = Path(dest) dest_tmp = str(dest_path.with_stem(f'.{dest_path.stem}.new')) try: # delete any leftover os.unlink(dest_tmp, dir_fd=dir_fd) except FileNotFoundError: pass start = time_monotonic() r = download_trystream(url, headers=headers, session=session, timeout=30) if r.status_code == 304: logging.info('%s: %d Not Modified', dest, r.status_code) return body_size = r.headers.get('Content-Length', None) last_modified = r.headers.get('Last-Modified', None) if last_modified is not None: try: last_modified = parsedate_to_datetime(last_modified) last_modified = last_modified.timestamp() except ValueError: logging.exception('Could not parse Last-Modified value') last_modified = None size = 0 pbar = None # XXX we can't use TemporaryFile as it uses O_EXCL, cf. # https://discuss.python.org/t/temporaryfile-contextmanager-that-allows-creating-a-directory-entry-on-success/19094/2 fd = os.open(os_path.dirname(dest), O_WRONLY|O_CLOEXEC|O_TMPFILE, mode=0o644, dir_fd=dir_fd) try: if progress is not None: pbar = progress( total=int(body_size) if body_size is not None else float('inf'), leave=False, unit_scale=True, unit_divisor=1024, unit='B' ) with os.fdopen(fd, mode='wb', closefd=False) as fp: for chunk in r.iter_content(chunk_size=2**16): chunk_size = len(chunk) if pbar is not None: pbar.update(chunk_size) size += chunk_size if max_size is not None and size > max_size: raise DownloadTooLarge(max_size) fp.write(chunk) r = None if last_modified is not None: os.utime(fd, times=(last_modified, last_modified)) # XXX unfortunately there is no way for linkat() to clobber the destination, # so we use a temporary file; it's racy, but thanks to O_TMPFILE better # (shorter race) than if we were dumping chunks in a named file descriptor os.link(f'/proc/self/fd/{fd}', dest_tmp, dst_dir_fd=dir_fd, follow_symlinks=True) finally: os.close(fd) if pbar is not None: pbar.close() try: # atomic rename (ensures output is never partially written) os.rename(dest_tmp, dest, src_dir_fd=dir_fd, dst_dir_fd=dir_fd) except (OSError, ValueError) as e: try: os.unlink(dest_tmp, dir_fd=dir_fd) finally: raise e elapsed = time_monotonic() - start logging.info('%s: Downloaded %s in %s (%s/s)', dest, common.format_bytes(size), common.format_time(elapsed), common.format_bytes(int(size/elapsed))) def _check_key_type(k : str, v : str, known_keys : list[type, tuple[set[str]]]) -> bool: for t, ks in known_keys: if k in ks and isinstance(v, t): return True return False def parse_config_dl(downloads) -> dict[str, dict[str, str|int]]: """Parse and validate the "downloads" section from the configuration dictionary""" if not isinstance(downloads, list): raise BadConfiguration(f'Invalid download recipe: {downloads}') known_keys = [ (str, {'path', 'url'}), (int, {'max-age', 'max-size'}) ] destinations = {} known_keys_set = {k for _,ks in known_keys for k in ks} for dl in downloads: if 'url' in dl: dls = [dl] elif 'basedir' in dl and 'baseurl' in dl and 'files' in dl and 'path' not in dl: dls = [] for filename in dl['files']: dl2 = { 'path' : os_path.join(dl['basedir'], filename), 'url' : dl['baseurl'] + filename } for k, v in dl.items(): if k not in ('basedir', 'baseurl', 'files'): dl2[k] = v dls.append(dl2) else: raise BadConfiguration(f'Invalid download recipe: {dl}') for dl in dls: path = dl.get('path', None) if path is None or path in ('', os_curdir, os_pardir) or path.endswith(os_sep): raise BadConfiguration(f'Invalid destination path "{path}"') if path in destinations: raise BadConfiguration(f'Duplicate download recipe for "{path}"') dl2 = {} for k, v in dl.items(): if k == 'path': continue if k not in known_keys_set: logging.warning('Ignoring unknown setting "%s" in download recipe for "%s"', k, path) elif not _check_key_type(k, v, known_keys): logging.warning('Ignoring setting "%s" in download recipe for "%s"' ' (invalid type)', k, path) else: dl2[k] = v destinations[path] = dl2 return destinations # pylint: disable-next=missing-function-docstring def main() -> NoReturn: common.init_logger(app=os_path.basename(__file__), level=logging.INFO) parser = argparse.ArgumentParser(description='Download or update GIS layers.') parser.add_argument('--cachedir', default=os_curdir, help=f'destination directory for downloaded files (default: {os_curdir})') parser.add_argument('--lockdir', default=None, help='optional directory for lock files') parser.add_argument('--quiet', action='store_true', help='skip progress bars even when stderr is a TTY') parser.add_argument('--debug', action='count', default=0, help=argparse.SUPPRESS) parser.add_argument('--exit-code', default=True, action=argparse.BooleanOptionalAction, help='whether to exit with status 1 in case of download failures') parser.add_argument('--force', default=False, action='store_true', help='always download regardless of age') parser.add_argument('groupname', nargs='*', help='group layer name(s) to process') args = parser.parse_args() if args.debug > 0: # pylint: disable=duplicate-code logging.getLogger().setLevel(logging.DEBUG) if args.debug > 1: from http.client import HTTPConnection # pylint: disable=import-outside-toplevel HTTPConnection.debuglevel = 1 requests_log = logging.getLogger('urllib3') requests_log.setLevel(logging.DEBUG) requests_log.propagate = True config = common.parse_config(groupnames=None if args.groupname == [] else args.groupname) downloads = parse_config_dl(config.get('downloads', [])) rv = 0 download_paths = set() for layername, layerdef in config.get('layers', {}).items(): sources = layerdef.get('sources', None) if sources is None or len(sources) < 1: logging.warning('Layer "%s" has no source, ignoring', layername) continue for idx, source in enumerate(sources): if 'source' not in source: continue source = source['source'] path = None if source is None else source.get('path', None) if path is None: logging.error('Source #%d of layer "%s" has no path, ignoring', idx, layername) rv = 1 elif path not in downloads: logging.warning('Ignoring unknown source of path "%s" from layer "%s"', path, layername) else: download_paths.add(path) if args.quiet or not sys.stderr.isatty(): pbar = None else: from tqdm import tqdm # pylint: disable=import-outside-toplevel pbar = tqdm # intentionally leave the dirfd open until the program terminates opendir_args = O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY destdir_fd = os.open(args.cachedir, opendir_args) lockdir_fd = None if args.lockdir is None else os.open(args.lockdir, opendir_args) session_requests = requests.Session() for dest in download_paths: dl = downloads[dest] headers = {} user_agent = config.get('User-Agent', None) if user_agent is not None: headers['User-Agent'] = user_agent try: # create parent directories destdir = os_path.dirname(dest) common.makedirs(destdir, mode=0o755, dir_fd=destdir_fd, exist_ok=True) # place an exclusive lock on a lockfile as the destination can be used by other layers # hence might be updated in parallel if lockdir_fd is not None: umask = os.umask(0o002) lockfile = getSourcePathLockFileName(dest) try: # use O_TRUNC to bump lockfile's mtime lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o664, dir_fd=lockdir_fd) finally: os.umask(umask) try: if lockdir_fd is not None: logging.debug('flock("%s", LOCK_EX)', lockfile) flock(lock_fd, LOCK_EX) try: st = os.stat(dest, dir_fd=destdir_fd) except (OSError, ValueError): # the file doesn't exist, or stat() failed for some reason pass else: if not args.force: max_age = dl.get('max-age', 6*3600) # 6h if max_age is not None: s = max_age + max(st.st_ctime, st.st_mtime) - time() if s > 0: logging.info('%s: Too young, try again in %s', dest, common.format_time(s)) continue headers['If-Modified-Since'] = formatdate(timeval=st.st_mtime, localtime=False, usegmt=True) download(dest, dl, dir_fd=destdir_fd, headers=headers, session=session_requests, progress=pbar) finally: if lockdir_fd is not None: os.close(lock_fd) except Exception: # pylint: disable=broad-exception-caught logging.exception('Could not download %s as %s', dl.get('url', '[N/A]'), dest) if args.exit_code: rv = 1 sys.exit(rv) main()