aboutsummaryrefslogtreecommitdiffstats
path: root/webmap-download
blob: c8c417852491ce662b0030c1d5ba1affdcabdd96 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
#!/usr/bin/python3

from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY, O_TMPFILE
import os, sys
from fcntl import flock, LOCK_EX
import logging
from time import time, monotonic as time_monotonic
import argparse
import itertools
from pathlib import Path
from email.utils import parsedate_to_datetime, formatdate
from hashlib import sha1
from math import modf
import requests

import common

def download_trystream(url, **kwargs):
    max_tries = 10
    f = kwargs.pop('session', requests)
    for i in itertools.count(1):
        try:
            r = f.get(url, **kwargs, stream=True)
        except (requests.Timeout, requests.ConnectionError):
            if i < max_tries:
                logging.error('timeout')
                continue
            raise
        else:
            r.raise_for_status()
            return r

def download(url, dest, dir_fd=None, headers={}, session=requests, progress=None):
    url = None if dl is None else dl.get('url', None)
    if url is None:
        logging.error('%s has no source URL, ignoring', dest)
        return
    max_size = dl.get('max-size', 2**26) # 64MiB
    logging.info('Downloading %s…', url)
    destPath = Path(dest)
    dest_tmp = destPath.with_stem(f'.{destPath.stem}.new').as_posix()
    try:
        # delete any leftover
        os.unlink(dest_tmp, dir_fd=dir_fd)
    except FileNotFoundError:
        pass

    start = time_monotonic()
    r = download_trystream(url, headers=headers, session=session, timeout=30)
    if r.status_code == requests.codes.not_modified:
        # XXX shouldn't we call os.utime(dest) to bump its ctime here?
        # otherwise we'll make several queries and get multiple 304
        # replies if the file is used by multiple layers
        logging.info('%s: %d Not Modified', dest, r.status_code)
        return

    body_size = r.headers.get('Content-Length', None)
    last_modified = r.headers.get('Last-Modified', None)
    if last_modified is not None:
        try:
            last_modified = parsedate_to_datetime(last_modified)
            last_modified = last_modified.timestamp()
        except ValueError:
            logging.exception('Could not parse Last-Modified value')
            last_modified = None

    # XXX we can't use TemporaryFile as it uses O_EXCL, cf.
    # https://discuss.python.org/t/temporaryfile-contextmanager-that-allows-creating-a-directory-entry-on-success/19094/2
    fd = os.open(os.path.dirname(dest), O_WRONLY|O_CLOEXEC|O_TMPFILE, mode=0o644, dir_fd=dir_fd)
    with os.fdopen(fd, mode='wb') as fp:
        size = 0

        if progress is not None:
            tot = int(body_size) if body_size is not None else float('inf')
            pbar = progress(total=tot, leave=False, unit_scale=True, unit_divisor=1024, unit='B')
        else:
            pbar = None

        try:
            for chunk in r.iter_content(chunk_size=2**16):
                chunk_size = len(chunk)
                if pbar is not None:
                    pbar.update(chunk_size)
                size = size + chunk_size
                if max_size is not None and size > max_size:
                    raise Exception(f'Payload exceeds max-size ({max_size})')
                fp.write(chunk)
        finally:
            if pbar is not None:
                pbar.close()

        r = None
        elapsed = time_monotonic() - start
        logging.info("%s: Downloaded %s in %s (%s/s)", dest, format_bytes(size),
            format_time(elapsed), format_bytes(int(size/elapsed)))

        # XXX unfortunately there is no way for linkat() to clobber the destination,
        # so we use a temporary file; it's racy, but thanks to O_TMPFILE better
        # (shorter race) than if we were dumping chunks in a named file descriptor
        os.link(f'/proc/self/fd/{fp.fileno()}', dest_tmp,
            dst_dir_fd=dir_fd, follow_symlinks=True)

    # no need to close fd here, it was taken care of by the context manager above

    try:
        if last_modified is not None:
            # XXX os.utime() doesn't work on file descriptors so we set mtime
            # after linkat() instead
            os.utime(dest_tmp, times=(last_modified, last_modified),
                dir_fd=dir_fd, follow_symlinks=False)
        os.rename(dest_tmp, dest, src_dir_fd=dir_fd, dst_dir_fd=dir_fd)
    except Exception as e:
        try:
            os.unlink(dest_tmp, dir_fd=dir_fd)
        except Exception:
            pass
        raise e

def format_bytes(n):
    if n < 768:
        return f'{n}B'
    elif n < 786432:
        return f'{n/1024:.2f}kiB'
    elif n < 805306368:
        return f'{n/1048576:.2f}MiB'
    else:
        return f'{n/1073741824:.2f}GiB'

def format_time(s):
    fs, s = modf(s)
    m, s = divmod(int(s), 60)
    h, m = divmod(m, 60)
    return f'{h:02d}:{m:02d}:{s + fs:06.3f}'

if __name__ == '__main__':
    logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)

    parser = argparse.ArgumentParser(description='Download or update GIS layers.')
    parser.add_argument('--cachedir', default=os.curdir,
        help=f'destination directory for downloaded files (default: {os.curdir})')
    parser.add_argument('--lockdir', default=None,
        help='optional directory for lock files')
    parser.add_argument('--debug', action='store_true', help=argparse.SUPPRESS)
    parser.add_argument('--exit-code', default=True, action=argparse.BooleanOptionalAction,
        help='whether to exit with status 1 in case of download failures')
    parser.add_argument('groupname', nargs='*', help='Group(s) to process')
    args = parser.parse_args()

    if args.debug:
        from http.client import HTTPConnection
        HTTPConnection.debuglevel = 1
        logging.getLogger().setLevel(logging.DEBUG)
        requests_log = logging.getLogger("urllib3")
        requests_log.setLevel(logging.DEBUG)
        requests_log.propagate = True

    common.load_config(groupnames=None if args.groupname == [] else args.groupname)

    sources = []
    for name, layerdefs in common.config.get('layers', {}).items():
        for layerdef in layerdefs:
            sourcedef = layerdef.get('source', {})
            sourcedef['layername'] = name
            sources.append(sourcedef)

    if sys.stderr.isatty():
        from tqdm import tqdm
        pbar = tqdm
    else:
        pbar = None

    # intentionally leave the dirfd open until the program terminates
    opendir_args = O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY
    destdir_fd = os.open(args.cachedir, opendir_args)
    lockdir_fd = None if args.lockdir is None else os.open(args.lockdir, opendir_args)

    sessionRequests = requests.Session()

    rv = 0
    downloads = set()
    for source in sources:
        dl = source.get('download', None)
        dl_module = None if dl is None else dl.get('module', None)
        if dl_module is None:
            fetch = download
        else:
            dl_module = __import__(dl_module)
            fetch = dl_module.download

        cache = source.get('cache', None)
        dest = None if cache is None else cache.get('path', None)
        if dest is None:
            raise Exception('Impossible')
        elif dest in downloads:
            logging.info('%s was already downloaded, skipping', dest)
            continue

        headers = {}
        user_agent = common.config.get('User-Agent', None)
        if user_agent is not None:
            headers['User-Agent'] = user_agent
        dest = str(dest) # convert from Path()

        try:
            # create parent directories
            destdir = os.path.dirname(dest)
            common.makedirs(destdir, mode=0o755, dir_fd=destdir_fd, exist_ok=True, logging=logging)

            # place an exclusive lock on a lockfile as the destination can be used by other layers
            # hence might be updated in parallel
            if lockdir_fd is not None:
                lockfile = sha1(dest.encode('utf-8')).hexdigest() + '.lck'
                # use O_TRUNC to bump lockfile's mtime
                lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644, dir_fd=lockdir_fd)
            try:
                if lockdir_fd is not None:
                    logging.debug('flock("%s", LOCK_EX)', lockfile)
                    flock(lock_fd, LOCK_EX)
                try:
                    st = os.stat(dest, dir_fd=destdir_fd)
                except (OSError, ValueError):
                    # the file doesn't exist, or stat() failed for some reason
                    pass
                else:
                    max_age = cache.get('max-age', 6*3600) # 6h
                    if max_age is not None:
                        s = max_age + max(st.st_ctime, st.st_mtime) - time()
                        if s > 0:
                            logging.info('%s: Too young, try again in %s',
                                dest, format_time(s))
                            continue
                    headers['If-Modified-Since'] = formatdate(timeval=st.st_mtime, localtime=False, usegmt=True)
                fetch(dl, dest, dir_fd=destdir_fd,
                    headers=headers, session=sessionRequests,
                    progress=pbar)
                downloads.add(dest)
            finally:
                if lockdir_fd is not None:
                    os.close(lock_fd)
        except Exception:
            logging.exception('Could not download %s as %s',
                              dl.get('url', source['layername']), dest)
            if args.exit_code:
                rv = 1
    exit(rv)