aboutsummaryrefslogtreecommitdiffstats
path: root/webmap-download
blob: 60403564aa1f5d94802fdf26bbe002cafddda11b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
#!/usr/bin/python3

from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY, O_TMPFILE
import os, sys
from fcntl import flock, LOCK_EX
import logging
from time import time, monotonic as time_monotonic
import argparse
import itertools
from pathlib import Path
from email.utils import parsedate_to_datetime, formatdate
from hashlib import sha1
from math import modf
import requests

import common

def download_trystream(url, **kwargs):
    max_tries = 10
    f = kwargs.pop('session', requests)
    for i in itertools.count(1):
        try:
            r = f.get(url, **kwargs, stream=True)
        except (requests.Timeout, requests.ConnectionError):
            if i < max_tries:
                logging.error('timeout')
                continue
            raise
        else:
            r.raise_for_status()
            return r

def download(url, dest, dir_fd=None, headers={}, session=requests, progress=None):
    url = None if dl is None else dl.get('url', None)
    if url is None:
        logging.error('%s has no source URL, ignoring', dest)
        return
    max_size = dl.get('max-size', 2**26) # 64MiB
    logging.info('Downloading %s…', url)
    destPath = Path(dest)
    dest_tmp = str(destPath.with_stem(f'.{destPath.stem}.new'))
    try:
        # delete any leftover
        os.unlink(dest_tmp, dir_fd=dir_fd)
    except FileNotFoundError:
        pass

    start = time_monotonic()
    r = download_trystream(url, headers=headers, session=session, timeout=30)
    if r.status_code == requests.codes.not_modified:
        # XXX shouldn't we call os.utime(dest) to bump its ctime here?
        # otherwise we'll make several queries and get multiple 304
        # replies if the file is used by multiple layers
        logging.info('%s: %d Not Modified', dest, r.status_code)
        return

    body_size = r.headers.get('Content-Length', None)
    last_modified = r.headers.get('Last-Modified', None)
    if last_modified is not None:
        try:
            last_modified = parsedate_to_datetime(last_modified)
            last_modified = last_modified.timestamp()
        except ValueError:
            logging.exception('Could not parse Last-Modified value')
            last_modified = None

    size = 0
    pbar = None

    # XXX we can't use TemporaryFile as it uses O_EXCL, cf.
    # https://discuss.python.org/t/temporaryfile-contextmanager-that-allows-creating-a-directory-entry-on-success/19094/2
    fd = os.open(os.path.dirname(dest), O_WRONLY|O_CLOEXEC|O_TMPFILE, mode=0o644, dir_fd=dir_fd)
    try:
        if progress is not None:
            pbar = progress(
                total=int(body_size) if body_size is not None else float('inf'),
                leave=False,
                unit_scale=True,
                unit_divisor=1024,
                unit='B'
            )
        with os.fdopen(fd, mode='wb', closefd=False) as fp:
            for chunk in r.iter_content(chunk_size=2**16):
                chunk_size = len(chunk)
                if pbar is not None:
                    pbar.update(chunk_size)
                size = size + chunk_size
                if max_size is not None and size > max_size:
                    raise Exception(f'Payload exceeds max-size ({max_size})')
                fp.write(chunk)
        r = None

        if last_modified is not None:
            os.utime(fd, times=(last_modified, last_modified), follow_symlinks=True)

        # XXX unfortunately there is no way for linkat() to clobber the destination,
        # so we use a temporary file; it's racy, but thanks to O_TMPFILE better
        # (shorter race) than if we were dumping chunks in a named file descriptor
        os.link(f'/proc/self/fd/{fd}', dest_tmp, dst_dir_fd=dir_fd, follow_symlinks=True)
    finally:
        os.close(fd)
        if pbar is not None:
            pbar.close()

    try:
        # atomic rename (ensures output is never partially written)
        os.rename(dest_tmp, dest, src_dir_fd=dir_fd, dst_dir_fd=dir_fd)
    except (OSError, ValueError) as e:
        try:
            os.unlink(dest_tmp, dir_fd=dir_fd)
        finally:
            raise e

    elapsed = time_monotonic() - start
    logging.info("%s: Downloaded %s in %s (%s/s)", dest, format_bytes(size),
        format_time(elapsed), format_bytes(int(size/elapsed)))

def format_bytes(n):
    if n < 768:
        return f'{n}B'
    elif n < 786432:
        return f'{n/1024:.2f}kiB'
    elif n < 805306368:
        return f'{n/1048576:.2f}MiB'
    else:
        return f'{n/1073741824:.2f}GiB'

def format_time(s):
    fs, s = modf(s)
    m, s = divmod(int(s), 60)
    h, m = divmod(m, 60)
    return f'{h:02d}:{m:02d}:{s + fs:06.3f}'

if __name__ == '__main__':
    logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)

    parser = argparse.ArgumentParser(description='Download or update GIS layers.')
    parser.add_argument('--cachedir', default=os.curdir,
        help=f'destination directory for downloaded files (default: {os.curdir})')
    parser.add_argument('--lockdir', default=None,
        help='optional directory for lock files')
    parser.add_argument('--quiet', action='store_true',
        help='skip progress bars even when stderr is a TTY')
    parser.add_argument('--debug', action='store_true', help=argparse.SUPPRESS)
    parser.add_argument('--exit-code', default=True, action=argparse.BooleanOptionalAction,
        help='whether to exit with status 1 in case of download failures')
    parser.add_argument('groupname', nargs='*', help='Group(s) to process')
    args = parser.parse_args()

    if args.debug:
        from http.client import HTTPConnection
        HTTPConnection.debuglevel = 1
        logging.getLogger().setLevel(logging.DEBUG)
        requests_log = logging.getLogger("urllib3")
        requests_log.setLevel(logging.DEBUG)
        requests_log.propagate = True

    common.load_config(groupnames=None if args.groupname == [] else args.groupname)

    sources = []
    for name, layerdefs in common.config.get('layers', {}).items():
        for layerdef in layerdefs:
            sourcedef = layerdef.get('source', {})
            sourcedef['layername'] = name
            sources.append(sourcedef)

    if args.quiet or not sys.stderr.isatty():
        pbar = None
    else:
        from tqdm import tqdm
        pbar = tqdm

    # intentionally leave the dirfd open until the program terminates
    opendir_args = O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY
    destdir_fd = os.open(args.cachedir, opendir_args)
    lockdir_fd = None if args.lockdir is None else os.open(args.lockdir, opendir_args)

    sessionRequests = requests.Session()

    rv = 0
    downloads = set()
    for source in sources:
        dl = source.get('download', None)
        dl_module = None if dl is None else dl.get('module', None)
        if dl_module is None:
            fetch = download
        else:
            dl_module = __import__(dl_module)
            fetch = dl_module.download

        cache = source.get('cache', None)
        dest = None if cache is None else cache.get('path', None)
        if dest is None:
            raise Exception('Impossible')

        dest = str(dest) # convert from Path()
        if dest in downloads:
            logging.info('%s was already downloaded, skipping', dest)
            continue

        headers = {}
        user_agent = common.config.get('User-Agent', None)
        if user_agent is not None:
            headers['User-Agent'] = user_agent

        try:
            # create parent directories
            destdir = os.path.dirname(dest)
            common.makedirs(destdir, mode=0o755, dir_fd=destdir_fd, exist_ok=True, logging=logging)

            # place an exclusive lock on a lockfile as the destination can be used by other layers
            # hence might be updated in parallel
            if lockdir_fd is not None:
                lockfile = sha1(dest.encode('utf-8')).hexdigest() + '.lck'
                # use O_TRUNC to bump lockfile's mtime
                lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644, dir_fd=lockdir_fd)
            try:
                if lockdir_fd is not None:
                    logging.debug('flock("%s", LOCK_EX)', lockfile)
                    flock(lock_fd, LOCK_EX)
                try:
                    st = os.stat(dest, dir_fd=destdir_fd)
                except (OSError, ValueError):
                    # the file doesn't exist, or stat() failed for some reason
                    pass
                else:
                    max_age = cache.get('max-age', 6*3600) # 6h
                    if max_age is not None:
                        s = max_age + max(st.st_ctime, st.st_mtime) - time()
                        if s > 0:
                            logging.info('%s: Too young, try again in %s',
                                dest, format_time(s))
                            continue
                    headers['If-Modified-Since'] = formatdate(timeval=st.st_mtime, localtime=False, usegmt=True)
                fetch(dl, dest, dir_fd=destdir_fd,
                    headers=headers, session=sessionRequests,
                    progress=pbar)
                downloads.add(dest)
            finally:
                if lockdir_fd is not None:
                    os.close(lock_fd)
        except Exception:
            logging.exception('Could not download %s as %s',
                              dl.get('url', source['layername']), dest)
            if args.exit_code:
                rv = 1
    exit(rv)