1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
|
#!/usr/bin/python3
from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY, O_TMPFILE
import os, sys
from fcntl import flock, LOCK_EX
import logging
from time import time, monotonic as time_monotonic
import argparse
import itertools
from pathlib import Path
from email.utils import parsedate_to_datetime, formatdate
from hashlib import sha1
from math import modf
import requests
import common
def download_trystream(url, **kwargs):
max_tries = 10
f = kwargs.pop('session', requests)
for i in itertools.count(1):
try:
r = f.get(url, **kwargs, stream=True)
except (requests.Timeout, requests.ConnectionError):
if i < max_tries:
logging.error('timeout')
continue
raise
else:
r.raise_for_status()
return r
def download(url, dest, dir_fd=None, headers={}, max_size=None, session=requests):
logging.info('Downloading %s…', url)
destPath = Path(dest)
dest_tmp = destPath.with_stem(f'.{destPath.stem}.new').as_posix()
try:
# delete any leftover
os.unlink(dest_tmp, dir_fd=dir_fd)
except FileNotFoundError:
pass
start = time_monotonic()
r = download_trystream(url, headers=headers, session=session, timeout=30)
if r.status_code == requests.codes.not_modified:
# XXX shouldn't we call os.utime(dest) to bump its ctime here?
# otherwise we'll make several queries and get multiple 304
# replies if the file is used by multiple layers
logging.info('%s: %d Not Modified', dest, r.status_code)
return
last_modified = r.headers.get('Last-Modified', None)
if last_modified is not None:
try:
last_modified = parsedate_to_datetime(last_modified)
last_modified = last_modified.timestamp()
except ValueError:
logging.exception('Could not parse Last-Modified value')
last_modified = None
# XXX we can't use TemporaryFile as it uses O_EXCL, cf.
# https://discuss.python.org/t/temporaryfile-contextmanager-that-allows-creating-a-directory-entry-on-success/19094/2
fd = os.open(os.path.dirname(dest), O_WRONLY|O_CLOEXEC|O_TMPFILE, mode=0o644, dir_fd=dir_fd)
with os.fdopen(fd, mode='wb') as fp:
size = 0
for chunk in r.iter_content(chunk_size=2**16):
size = size + len(chunk)
if max_size is not None and size > max_size:
raise Exception(f'Payload exceeds max-size ({max_size})')
fp.write(chunk)
r = None
end = time_monotonic()
# XXX unfortunately there is no way for linkat() to clobber the destination,
# so we use a temporary file; it's racy, but thanks to O_TMPFILE better
# (shorter race) than if we were dumping chunks in a named file descriptor
os.link(f'/proc/self/fd/{fp.fileno()}', dest_tmp,
dst_dir_fd=dir_fd, follow_symlinks=True)
# no need to close fd here, it was taken care of by the context manager above
try:
if last_modified is not None:
# XXX os.utime() doesn't work on file descriptors so we set mtime
# after linkat() instead
os.utime(dest_tmp, times=(last_modified, last_modified),
dir_fd=dir_fd, follow_symlinks=False)
os.rename(dest_tmp, dest, src_dir_fd=dir_fd, dst_dir_fd=dir_fd)
elapsed = end - start
logging.info("%s: Downloaded %s in %s (%s/s)", dest, format_bytes(size),
format_time(elapsed), format_bytes(int(size/elapsed)))
except Exception as e:
try:
os.unlink(dest_tmp, dir_fd=dir_fd)
except Exception:
pass
raise e
def format_bytes(n):
if n < 768:
return f'{n}B'
elif n < 786432:
return f'{n/1024:.2f}kiB'
elif n < 805306368:
return f'{n/1048576:.2f}MiB'
else:
return f'{n/1073741824:.2f}GiB'
def format_time(s):
fs, s = modf(s)
m, s = divmod(int(s), 60)
h, m = divmod(m, 60)
return f'{h:02d}:{m:02d}:{s + fs:06.3f}'
if __name__ == '__main__':
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)
parser = argparse.ArgumentParser(description='Download or update GIS layers.')
parser.add_argument('--cachedir', default=None,
help='destination directory for downloaded files (default: .)')
parser.add_argument('--lockdir', default=None,
help='directory for lock files (default: value of --cachedir option)')
parser.add_argument('--debug', action='store_true', help=argparse.SUPPRESS)
parser.add_argument('--exit-code', action=argparse.BooleanOptionalAction,
help='whether to exit with status 1 in case of download failures')
parser.add_argument('groupname', nargs='*', help='Group(s) to process')
args = parser.parse_args()
if args.debug:
from http.client import HTTPConnection
HTTPConnection.debuglevel = 1
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
common.load_config(groupnames=None if args.groupname == [] else args.groupname)
sources = []
for name, layerdefs in common.config.get('layers', {}).items():
for layerdef in layerdefs:
sourcedef = layerdef.get('source', {})
sourcedef['layername'] = name
sources.append(sourcedef)
# intentionally leave the dirfd open until the program terminates
opendir_args = O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY
destdir_fd = None if args.cachedir is None else os.open(args.cachedir, opendir_args)
lockdir_fd = destdir_fd if args.lockdir is None else os.open(args.lockdir, opendir_args)
if ((destdir_fd is None and lockdir_fd is None) or
(destdir_fd is not None and lockdir_fd is not None and destdir_fd == lockdir_fd)):
lockdir_is_destdir = True
else:
destdir_st = os.fstat(destdir_fd)
lockdir_st = os.fstat(lockdir_fd)
lockdir_is_destdir = destdir_st.st_ino == lockdir_st.st_ino
sessionRequests = requests.Session()
rv = 0
downloads = set()
for source in sources:
dl = source.get('download', None)
script = None if dl is None else dl.get('script', None)
if script is not None and script != os.path.basename(__file__):
logging.info('Layer "%s" is not for us (%s != %s), skipping',
source['layername'],
script, os.path.basename(__file__))
continue
url = None if dl is None else dl.get('url', None)
if url is None:
logging.error('Layer "%s" has no source URL, ignoring',
source['layername'])
continue
cache = source.get('cache', None)
dest = None if cache is None else cache.get('path', None)
if dest is None:
raise Exception('Impossible')
elif url in downloads:
logging.info('%s was already downloaded, skipping', dest)
continue
headers = {}
user_agent = common.config.get('User-Agent', None)
if user_agent is not None:
headers['User-Agent'] = user_agent
dest = str(dest) # convert from Path()
try:
# create parent directories
destdir = os.path.dirname(dest)
common.makedirs(destdir, mode=0o755, dir_fd=destdir_fd, exist_ok=True, logging=logging)
# place an exclusive lock on a lockfile as the destination can be used by other layers
# hence might be updated in parallel
if lockdir_is_destdir:
lockfile = dest + '.lck'
else:
# use a flat hierarchy when lockdir != destdir as this avoids leaving empty directories
# behind when removing left overs with tmpfiles.d(5)
lockfile = sha1(dest.encode('utf-8')).hexdigest() + '.lck'
# use O_TRUNC to bump lockfile's mtime
lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644, dir_fd=lockdir_fd)
try:
logging.debug('flock("%s", LOCK_EX)', lockfile)
flock(lock_fd, LOCK_EX)
try:
st = os.stat(dest, dir_fd=destdir_fd)
except (OSError, ValueError):
# the file doesn't exist, or stat() failed for some reason
pass
else:
max_age = cache.get('max-age', 6*3600) # 6h
if max_age is not None:
s = max_age + max(st.st_ctime, st.st_mtime) - time()
if s > 0:
logging.info('%s: Too young, try again in %s',
dest, format_time(s))
continue
headers['If-Modified-Since'] = formatdate(timeval=st.st_mtime, localtime=False, usegmt=True)
max_size = dl.get('max-size', 2**26) # 64MiB
download(url, dest, dir_fd=destdir_fd, max_size=max_size,
headers=headers, session=sessionRequests)
downloads.add(url)
finally:
os.close(lock_fd)
except Exception:
logging.exception('Could not download %s as %s', url, dest)
if args.exit_code:
rv = 1
exit(rv)
|