#!/usr/bin/python3 from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY, O_TMPFILE import os, sys from fcntl import flock, LOCK_EX import logging from time import time, monotonic as time_monotonic import argparse import itertools from pathlib import Path from email.utils import parsedate_to_datetime, formatdate from hashlib import sha1 from math import modf import requests import common def download_trystream(url, **kwargs): max_tries = 10 f = kwargs.pop('session', requests) for i in itertools.count(1): try: r = f.get(url, **kwargs, stream=True) except (requests.Timeout, requests.ConnectionError): if i < max_tries: logging.error('timeout') continue raise else: r.raise_for_status() return r def download(url, dest, dir_fd=None, headers={}, max_size=None, session=requests, progress=None): logging.info('Downloading %s…', url) destPath = Path(dest) dest_tmp = destPath.with_stem(f'.{destPath.stem}.new').as_posix() try: # delete any leftover os.unlink(dest_tmp, dir_fd=dir_fd) except FileNotFoundError: pass start = time_monotonic() r = download_trystream(url, headers=headers, session=session, timeout=30) if r.status_code == requests.codes.not_modified: # XXX shouldn't we call os.utime(dest) to bump its ctime here? # otherwise we'll make several queries and get multiple 304 # replies if the file is used by multiple layers logging.info('%s: %d Not Modified', dest, r.status_code) return body_size = r.headers.get('Content-Length', None) last_modified = r.headers.get('Last-Modified', None) if last_modified is not None: try: last_modified = parsedate_to_datetime(last_modified) last_modified = last_modified.timestamp() except ValueError: logging.exception('Could not parse Last-Modified value') last_modified = None # XXX we can't use TemporaryFile as it uses O_EXCL, cf. # https://discuss.python.org/t/temporaryfile-contextmanager-that-allows-creating-a-directory-entry-on-success/19094/2 fd = os.open(os.path.dirname(dest), O_WRONLY|O_CLOEXEC|O_TMPFILE, mode=0o644, dir_fd=dir_fd) with os.fdopen(fd, mode='wb') as fp: size = 0 if progress is not None: tot = int(body_size) if body_size is not None else float('inf') pbar = progress(total=tot, leave=False, unit_scale=True, unit_divisor=1024, unit='B') else: pbar = None try: for chunk in r.iter_content(chunk_size=2**16): chunk_size = len(chunk) if pbar is not None: pbar.update(chunk_size) size = size + chunk_size if max_size is not None and size > max_size: raise Exception(f'Payload exceeds max-size ({max_size})') fp.write(chunk) finally: if pbar is not None: pbar.close() r = None elapsed = time_monotonic() - start logging.info("%s: Downloaded %s in %s (%s/s)", dest, format_bytes(size), format_time(elapsed), format_bytes(int(size/elapsed))) # XXX unfortunately there is no way for linkat() to clobber the destination, # so we use a temporary file; it's racy, but thanks to O_TMPFILE better # (shorter race) than if we were dumping chunks in a named file descriptor os.link(f'/proc/self/fd/{fp.fileno()}', dest_tmp, dst_dir_fd=dir_fd, follow_symlinks=True) # no need to close fd here, it was taken care of by the context manager above try: if last_modified is not None: # XXX os.utime() doesn't work on file descriptors so we set mtime # after linkat() instead os.utime(dest_tmp, times=(last_modified, last_modified), dir_fd=dir_fd, follow_symlinks=False) os.rename(dest_tmp, dest, src_dir_fd=dir_fd, dst_dir_fd=dir_fd) except Exception as e: try: os.unlink(dest_tmp, dir_fd=dir_fd) except Exception: pass raise e def format_bytes(n): if n < 768: return f'{n}B' elif n < 786432: return f'{n/1024:.2f}kiB' elif n < 805306368: return f'{n/1048576:.2f}MiB' else: return f'{n/1073741824:.2f}GiB' def format_time(s): fs, s = modf(s) m, s = divmod(int(s), 60) h, m = divmod(m, 60) return f'{h:02d}:{m:02d}:{s + fs:06.3f}' if __name__ == '__main__': logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO) parser = argparse.ArgumentParser(description='Download or update GIS layers.') parser.add_argument('--cachedir', default=None, help='destination directory for downloaded files (default: .)') parser.add_argument('--lockdir', default=None, help='directory for lock files (default: value of --cachedir option)') parser.add_argument('--debug', action='store_true', help=argparse.SUPPRESS) parser.add_argument('--exit-code', action=argparse.BooleanOptionalAction, help='whether to exit with status 1 in case of download failures') parser.add_argument('groupname', nargs='*', help='Group(s) to process') args = parser.parse_args() if args.debug: from http.client import HTTPConnection HTTPConnection.debuglevel = 1 logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True common.load_config(groupnames=None if args.groupname == [] else args.groupname) sources = [] for name, layerdefs in common.config.get('layers', {}).items(): for layerdef in layerdefs: sourcedef = layerdef.get('source', {}) sourcedef['layername'] = name sources.append(sourcedef) if sys.stderr.isatty(): from tqdm import tqdm pbar = tqdm else: pbar = None # intentionally leave the dirfd open until the program terminates opendir_args = O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY destdir_fd = None if args.cachedir is None else os.open(args.cachedir, opendir_args) lockdir_fd = destdir_fd if args.lockdir is None else os.open(args.lockdir, opendir_args) if ((destdir_fd is None and lockdir_fd is None) or (destdir_fd is not None and lockdir_fd is not None and destdir_fd == lockdir_fd)): lockdir_is_destdir = True else: destdir_st = os.fstat(destdir_fd) lockdir_st = os.fstat(lockdir_fd) lockdir_is_destdir = destdir_st.st_ino == lockdir_st.st_ino sessionRequests = requests.Session() rv = 0 downloads = set() for source in sources: dl = source.get('download', None) script = None if dl is None else dl.get('script', None) if script is not None and script != os.path.basename(__file__): logging.info('Layer "%s" is not for us (%s != %s), skipping', source['layername'], script, os.path.basename(__file__)) continue url = None if dl is None else dl.get('url', None) if url is None: logging.error('Layer "%s" has no source URL, ignoring', source['layername']) continue cache = source.get('cache', None) dest = None if cache is None else cache.get('path', None) if dest is None: raise Exception('Impossible') elif url in downloads: logging.info('%s was already downloaded, skipping', dest) continue headers = {} user_agent = common.config.get('User-Agent', None) if user_agent is not None: headers['User-Agent'] = user_agent dest = str(dest) # convert from Path() try: # create parent directories destdir = os.path.dirname(dest) common.makedirs(destdir, mode=0o755, dir_fd=destdir_fd, exist_ok=True, logging=logging) # place an exclusive lock on a lockfile as the destination can be used by other layers # hence might be updated in parallel if lockdir_is_destdir: lockfile = dest + '.lck' else: # use a flat hierarchy when lockdir != destdir as this avoids leaving empty directories # behind when removing left overs with tmpfiles.d(5) lockfile = sha1(dest.encode('utf-8')).hexdigest() + '.lck' # use O_TRUNC to bump lockfile's mtime lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644, dir_fd=lockdir_fd) try: logging.debug('flock("%s", LOCK_EX)', lockfile) flock(lock_fd, LOCK_EX) try: st = os.stat(dest, dir_fd=destdir_fd) except (OSError, ValueError): # the file doesn't exist, or stat() failed for some reason pass else: max_age = cache.get('max-age', 6*3600) # 6h if max_age is not None: s = max_age + max(st.st_ctime, st.st_mtime) - time() if s > 0: logging.info('%s: Too young, try again in %s', dest, format_time(s)) continue headers['If-Modified-Since'] = formatdate(timeval=st.st_mtime, localtime=False, usegmt=True) max_size = dl.get('max-size', 2**26) # 64MiB download(url, dest, dir_fd=destdir_fd, max_size=max_size, headers=headers, session=sessionRequests, progress=pbar) downloads.add(url) finally: os.close(lock_fd) except Exception: logging.exception('Could not download %s as %s', url, dest) if args.exit_code: rv = 1 exit(rv)