aboutsummaryrefslogtreecommitdiffstats
path: root/webmap-download-mrr.py
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-04-28 12:47:31 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-04-28 17:18:12 +0200
commit981a97c572ee064b4f3ade5d1fc9e5c4397701ff (patch)
treea0dc80eb84457d3c470181a37167290fbe24cca2 /webmap-download-mrr.py
parent31feb9d0a559d5eb268db04ca91855b959568811 (diff)
Update Mineralrättigheter import logic.
Thanks to EU Directive 2019/1024 of the European Parliament and of the Council of 20 June 2019 on open data and the re-use of public sector information, and the Commission Implementing Regulation (EU) 2023/138 of 21 December 2022, the mineral register is now available under the terms of the CC0 1.0 Universal licence, see https://www.sgu.se/produkter-och-tjanster/geologiska-data/malmer-och-mineral--geologiska-data/mineralrattigheter-och-prospektering/ Given we no longer need to parse SVG images from the webmap, we drop webmap-download-mrr.py and add layers for expired and forbidden permits (ut_metaller_industrimineral_forfallna, ut_olja_gas_diamant_forfallna, bearbetningskoncessioner_forfallna, ut_metaller_industrimineral_forbud, ut_diamant_forbud) as well as markanvisningar_bk_ansokta. Unfortunately the GeoPackage file doesn't include peat concessions, so we drop them from config.yml for now. Should they be of interest we can always restore webmap-download-mrr.py and/or order the register from Bergsstaten, cf. https://resource.sgu.se/bergsstaten/exporter-ur-mrr-info.pdf .
Diffstat (limited to 'webmap-download-mrr.py')
-rw-r--r--webmap-download-mrr.py673
1 files changed, 0 insertions, 673 deletions
diff --git a/webmap-download-mrr.py b/webmap-download-mrr.py
deleted file mode 100644
index 696e46c..0000000
--- a/webmap-download-mrr.py
+++ /dev/null
@@ -1,673 +0,0 @@
-#----------------------------------------------------------------------
-# Backend utilities for the Klimatanalys Norr project (download MRR layers)
-# Copyright © 2024 Guilhem Moulin <info@guilhem.se>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <https://www.gnu.org/licenses/>.
-#----------------------------------------------------------------------
-
-# Unfortunately SGU/Bergsstaten doesn't offer MRR layer files to
-# download, cf.
-#
-# https://www.sgu.se/produkter-och-tjanster/geologiska-data/vara-data-per-amnesomrade/berggrundsgeologiska-data/mineralrattigheter/
-#
-# but it has an online webmap (WMS) at
-#
-# https://apps.sgu.se/kartvisare/kartvisare-mineralrattigheter.html
-#
-# so we add a dedicated module to probe and fetch features from it.
-
-from os import O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_CLOEXEC, O_TMPFILE
-import os
-from pathlib import Path
-from time import time, sleep, monotonic as time_monotonic
-import re
-import logging
-import requests
-import itertools
-import random
-import json
-from math import sqrt
-from hashlib import sha256
-from lxml import etree
-
-from osgeo import gdal, ogr
-gdal.UseExceptions()
-
-import common
-
-LAST_UPDATED = 'Last updated'
-class WMS:
- def __init__(self, headers={}, session=requests, url=None, version='1.1.1'):
- self.req_count = 0
- self.headers = headers
- self.session = session
- self.url = url
- self.version = version
-
- def __del__(self):
- self.print_req_count()
-
- def print_req_count(self, elapsed=None):
- if self.req_count is None or self.url is None:
- return
- elif elapsed is None:
- logging.info('%d WMS requests sent to %s', self.req_count, self.url)
- else:
- logging.info('%d WMS requests (%.2f\u202Freq/s) sent to %s', self.req_count,
- self.req_count/elapsed, self.url)
-
- # Send a request to the WMS server
- def req(self, request, content_type=None, params={}, max_tries=10, timeout=10):
- params2 = {
- 'SERVICE': 'WMS',
- 'VERSION': self.version,
- 'REQUEST': request,
- } | params
- for i in itertools.count(1):
- try:
- r = self.session.get(self.url,
- headers = self.headers,
- params = params2,
- timeout = timeout,
- )
- except (requests.Timeout, requests.ConnectionError) as e:
- if i < max_tries:
- logging.error('Timeout')
- continue
- raise e
- self.req_count += 1
- if i < max_tries and r.status_code in [requests.codes.bad_gateway, requests.codes.gateway_timeout]:
- logging.error(r.status_code)
- sleep(30)
- continue
- r.raise_for_status()
- if content_type is not None:
- content_type2 = r.headers.get('Content-Type')
- content_type2 = content_type2.split(';')[0] if content_type2 is not None else None
- if content_type2 is None or content_type2 != content_type:
- raise Exception(f'Content-Type mismatch: expected {content_type}, got {content_type2}')
- return r
-
- # Return metadata for the given feature
- def getFeatureInfo(self, x, y, layer, res=.25):
- content_type = 'application/json'
- width = height = 101 # value from https://apps.sgu.se/kartvisare/kartvisare-mineralrattigheter.html
- half_width = res * width/2
- half_height = res * height/2
- x1 = x - half_width
- x2 = x + half_width
- y1 = y - half_height
- y2 = y + half_height
- bbox = [x1, y1, x2, y2]
-
- max_count = 300
- r = self.req('GetFeatureInfo', content_type=content_type, params={
- 'LAYERS': layer,
- 'FORMAT': 'image/png',
- 'TRANSPARENT': 'true',
- 'TILED': 'false',
- 'WIDTH': width,
- 'HEIGHT': height,
- 'QUERY_LAYERS': layer,
- 'INFO_FORMAT': content_type,
- 'FEATURE_COUNT': max_count,
- 'X': 50,
- 'Y': 50,
- 'BBOX': ','.join(map(str, bbox)),
- 'SRS': common.config['SRS'],
- })
- data = r.json()
- if data.get('type', '') != 'FeatureCollection':
- raise Exception(f'Did not return a feature collection')
- if data.get('features', None) is None:
- raise Exception(f'Missing features list')
- if len(data.get('features')) >= max_count:
- logging.warning('Returned max number of features (%d), possibly missing some!', max_count)
- return data
-
- # Refine a geometry at the given resolution.
- # Return the iteration count at which the feature was found (or None if it was not found) along
- # with a dictionary `fingerprint:feature` found along the way.
- def refineGeometry(self, layername, fpr, feature, res=1, crs=None, max_tries=1024, features={}):
- geojson = json.dumps(feature.get('geometry'))
- geom = ogr.CreateGeometryFromJson(geojson)
- if geom is None:
- raise Exception(f'Invalid GeoJSON geometry')
- triangulation = triangulate(geom.MakeValid())
- i = 0
- while fpr not in features and i < max_tries:
- x, y = getRandomPoint(triangulation)
- data = self.getFeatureInfo(x, y, layername, res=res)
- crs2 = data.get('crs', None)
- features2 = data['features']
- if ((crs is None and crs2 is not None) or (crs is not None and crs2 is None)
- or (crs is not None and crs2 is not None and crs != crs2)):
- if len(features2) > 0:
- # the server should never return different CRSs
- raise Exception(f'{json.dumps(crs)} != {json.dumps(crs2)}')
- for feature2 in features2:
- fpr2, feature2 = mkFeature(feature2, layername)
- features[fpr2] = feature2
- i += 1
- if fpr not in features:
- return None, features
- return i, features
-
- # Refine geometries at the given resolution, modifying features in place
- def refineGeometries(self, features, layername, res=1, crs=None, max_tries=1024, progress=None):
- feature_count = len(features)
- features2 = {}
- pbar = None
- stats = {}
- try:
- if progress is not None:
- pbar = progress(total=feature_count, leave=False, unit='')
- for fpr, feature in features.items():
- i, _ = self.refineGeometry(layername, fpr, feature,
- res=res, crs=crs, max_tries=max_tries, features=features2)
- if i is None:
- i = '-'
- stats[i] = stats.get(i, 0) + 1
- if pbar is not None:
- pbar.update(1)
- finally:
- if pbar is not None:
- pbar.close()
-
- features.update(features2)
-
- # show refinement stats
- if stats.get(0, 0) + stats.get(1, 0) == feature_count:
- details = 'all on first attempt'
- else:
- details = []
- for ks, vs in sorted(stats.items(), key=lambda item: max_count+1 if item[0] == '-' else item[0]):
- if ks == '-':
- msg = f'{vs}× FAILED'
- elif len(details) > 0:
- msg = f'{vs}× {ks}'
- elif ks == 1:
- msg = f'{vs}× after 1 attempt'
- else:
- msg = f'{vs}× after {ks} attempts'
- details.append(msg)
- details = ', '.join(details)
- w = 'geometry' if feature_count == 1 else 'geometries'
- logging.info('Refined %d %s at resolution %s (%s)', feature_count, w, str(res), details)
-
- # Guess whether the cached layer at path is up to date
- def is_uptodate(self, layername, path, dir_fd=None,
- oldsvg_fp=None, newsvg='',
- max_age=864000):
- try:
- fd = os.open(path, O_RDONLY, dir_fd=dir_fd)
- except (OSError, ValueError):
- # couldn't open cached file
- logging.debug('%s is stale: open() failed', path)
- return False
-
- try:
- try:
- st = os.fstat(fd)
- svg_st = os.fstat(oldsvg_fp.fileno())
- except (OSError, ValueError):
- logging.debug('%s is stale: fstat() failed', path)
- return False
-
- if st.st_mtime < svg_st.st_mtime:
- # writing SVG is racy so always assume that a younger SVG
- # means a stale GeoJSON
- logging.debug('%s is stale: SVG is younger (%f < %f)', path,
- st.st_mtime, svg_st.st_mtime)
- return False
-
- try:
- oldsvg = oldsvg_fp.read()
- finally:
- # seek back to the begining (we then truncate and write the SVG)
- oldsvg_fp.seek(0)
- if newsvg is None or oldsvg != newsvg:
- logging.debug('%s is stale: SVG differs', path)
- return False
-
- if max_age is not None:
- current_time = time()
- mtime = max(st.st_ctime, st.st_mtime)
- if max_age + mtime < current_time:
- # always consider files older than max_age as stale
- logging.debug('%s is stale: too old (%f + %f < %f)', path,
- max_age, mtime, current_time)
- return False
-
- with os.fdopen(fd, mode='rb', closefd=False) as fp:
- try:
- # load cached geojson
- data = json.load(fp)
- except json.JSONDecodeError:
- logging.warning('%s is stale: not a valid JSON file', path)
- return False
- finally:
- os.close(fd)
-
- # more checks on the GeoJSON content
- if (not isinstance(data, dict) or 'type' not in data or data['type'] != 'FeatureCollection'
- or 'features' not in data or not isinstance(data['features'], list)):
- logging.warning('%s: not a valid GeoJSON file', path)
- return False
-
- crs = data.get('crs', None)
- data = data['features']
- if len(data) == 0:
- # always assume empty cached files are stale
- logging.debug('%s is stale: has no features', path)
- return False
-
- data = random.choice(data) # random features
- if (not isinstance(data, dict) or 'type' not in data or data['type'] != 'Feature'
- or 'geometry' not in data or not isinstance(data['geometry'], dict)):
- logging.warning('%s: not a valid GeoJSON file', path)
- return False
-
- last_updated = data['properties'].get(LAST_UPDATED, None)
- if last_updated is None or last_updated == '':
- logging.debug('%s is stale: %s has no "%s" value', path,
- json.dumps(data), LAST_UPDATED)
- return False
-
- # check whether the randomly chosen feature is still there and that
- # its "Last updated" value has not changed
- # (this check comes last as it is potentially expensive)
- fpr, data = mkFeature(data, layername)
- found, features = self.refineGeometry(layername, fpr, data, res=.25, crs=crs, max_tries=10)
- if not found:
- logging.debug('%s is stale: could not find feature %s', path, json.dumps(data))
- return False
-
- last_updated2 = features[fpr]['properties'].get(LAST_UPDATED, None)
- if last_updated2 is None or last_updated != last_updated2:
- logging.debug('%s is stale: "%s" values differ (%s != %s)',
- path, LAST_UPDATED, last_updated, last_updated2)
- return False
-
- # assume the cached file is up to date
- logging.debug('is_uptodate(%s, %s, …) == True', layername, path)
- return True
-
- def getFeatures(self, layername, path_strings, gt, progress=None):
- crs = None
- max_probe_count = 3
- probes = []
- for d in path_strings:
- d0 = d
- m = POINT0_RE.match(d)
- if m is None:
- raise Exception(f"Can't parse <path>'s \"d\" attribute value: \"{d0}\"")
- d = d.removeprefix(m.group())
- x = float(m.group(1))
- y = float(m.group(2))
- path = [ [x, y] ]
-
- # get first part of the path (until the pen is moved)
- while True:
- m = POINT_RE.match(d)
- if m is None:
- break
- d = d.removeprefix(m.group())
- x = float(m.group(1))
- y = float(m.group(2))
- if path[-1] != [x, y]:
- path.append([x, y])
- if d.startswith('Z') and len(path) > 0:
- if path[0] != path[-1]:
- path.append(path[0])
- d = d.removeprefix("Z")
- logging.debug('d="%s" -> path=%s, rest="%s"', d0, path, d)
-
- # convert to an outer ring
- ring0 = ogr.Geometry(ogr.wkbLinearRing)
- for px, py in path:
- x, y = gdal.ApplyGeoTransform(gt, px, py)
- ring0.AddPoint_2D(x, y)
-
- poly0 = ogr.Geometry(ogr.wkbPolygon)
- poly0.AddGeometry(ring0)
-
- # randomly probe once per 10km² (1000ha), but not more than max_probe_count
- probe_count = poly0.GetArea() / 1e7
- if probe_count < 1:
- probe_count = 1
- elif probe_count > max_probe_count:
- probe_count = max_probe_count
- else:
- probe_count = round(probe_count)
-
- probes.append( (poly0.MakeValid(), probe_count) )
-
- if len(probes) == 0:
- self.req_count = None # don't print request count
- return {}, crs
-
- features = {}
- probe_count_sum = sum([n for _, n in probes])
- pbar = None
- try:
- if progress is not None:
- pbar = progress(total=probe_count_sum, leave=False, unit='')
- for geom, probe_count in probes:
- triangulation = triangulate(geom)
-
- # always try the centroid first (later fallback to a random
- # point if the centroid is not in geom)
- x, y = nearCentroidWithin(geom)
-
- # crude probing and add feature(s) found along the way
- for i in range(probe_count):
- if i > 0 or x is None or y is None:
- x, y = getRandomPoint(triangulation)
-
- data = self.getFeatureInfo(x, y, layername, res=1024)
- crs2 = data.get('crs', None)
- data = data['features']
- if crs2 is not None:
- if crs is None:
- crs = crs2
- elif crs != crs2 and len(data) > 0:
- # the server should never return different CRSs
- raise Exception(f'{json.dumps(crs)} != {json.dumps(crs2)}')
-
- for feature in data:
- fpr, feature = mkFeature(feature, layername)
- features[fpr] = feature
- data = None
-
- if pbar is not None:
- pbar.update(1)
- geom = None
- finally:
- del probes[:]
- if pbar is not None:
- pbar.close()
-
- logging.info('Found %d unique feature(s) after crude probing', len(features))
- return features, crs
-
-# Triangulate the input geometry
-def triangulate(geom):
- geomType = geom.GetGeometryType()
- if geomType == ogr.wkbPolygon or geomType == ogr.wkbMultiPolygon or geomType == ogr.wkbGeometryCollection:
- if geom.Area() < 1e-9:
- # degenerate polygon
- return geom.Boundary()
- elif geomType == ogr.wkbLineString or geomType == ogr.wkbMultiLineString:
- return geom
- elif geomType == ogr.wkbPoint or geomType == ogr.wkbMultiPoint:
- return geom
- else:
- raise Exception(f'Invalid geometry type: {geom}')
-
- triangulation = geom.DelaunayTriangulation(dfTolerance=0., bOnlyEdges=False)
- triangulation2 = ogr.Geometry(ogr.wkbGeometryCollection)
- for i in range(triangulation.GetGeometryCount()):
- triangle = triangulation.GetGeometryRef(i)
- if geom.Contains(triangle):
- triangulation2.AddGeometry(triangle)
- if triangulation2.IsEmpty():
- # can happen in some degenerate cases
- return triangulation
- else:
- return triangulation2
-
-# Return the centroid (modulo rounding) if is within the input geometry,
-# and None, None otherwise
-def nearCentroidWithin(geom, decimals=2):
- centroid = geom.Centroid()
- if centroid.GetGeometryType() != ogr.wkbPoint:
- raise Exception(f'{centroid} is not a point')
- x, y = centroid.GetPoint_2D(0)
- if decimals is not None:
- x = round(x, decimals)
- y = round(y, decimals)
- pt = ogr.Geometry(ogr.wkbPoint)
- pt.AddPoint_2D(x, y)
- if pt.Within(geom):
- return x, y
- else:
- return None, None
-
-# Get a random point within the given geometry
-# The geometry needs to be triangulated first for speed, see
-# see https://dev.to/bogdanalexandru/generating-random-points-within-a-polygon-in-unity-nce
-def getRandomPoint(triangulation, decimals=2, max_tries=1024):
- # special cases if source is a degenerate polygon
- geomType = triangulation.GetGeometryType()
- if geomType == ogr.wkbMultiPoint:
- if triangulation.GetGeometryCount() == 0:
- raise Exception(f'Empty {triangulation}')
- i = random.randrange(0, triangulation.GetGeometryCount())
- pt = triangulation.GetGeometryRef(i)
- return pt.GetX(), pt.GetY()
- elif geomType == ogr.wkbLineString:
- i = random.uniform(0, triangulation.Length())
- pt = triangulation.Value(i)
- return pt.GetX(), pt.GetY()
- elif geomType == ogr.wkbMultiLineString:
- lengths = []
- for i in range(triangulation.GetGeometryCount()):
- line = triangulation.GetGeometryRef(i)
- lengths.append(line.Length())
- k = random.choices(range(triangulation.GetGeometryCount()), weights=lengths, k=1)[0]
- line = triangulation.GetGeometryRef(k)
- i = random.uniform(0, line.Length())
- pt = line.Value(i)
- return pt.GetX(), pt.GetY()
- elif geomType != ogr.wkbGeometryCollection:
- raise Exception(f'Not a GeometryCollection {triangulation}')
-
- areas = []
- for i in range(triangulation.GetGeometryCount()):
- triangle = triangulation.GetGeometryRef(i)
- areas.append(triangle.GetArea())
-
- for i in range(max_tries):
- # pick a random triangle (weighted by area)
- k = random.choices(range(triangulation.GetGeometryCount()), weights=areas, k=1)[0]
- triangle = triangulation.GetGeometryRef(k)
-
- ring = triangle.GetGeometryRef(0)
- if triangle.GetGeometryType() != ogr.wkbPolygon or triangle.GetGeometryCount() != 1 or ring.GetPointCount() != 4:
- raise Exception('Not a triangle')
-
- x0, y0 = ring.GetPoint_2D(0)
- x1, y1 = ring.GetPoint_2D(1)
- x2, y2 = ring.GetPoint_2D(2)
-
- # pick a random point inside that triangle
- r1 = random.uniform(0, 1)
- r2 = random.uniform(0, 1)
- sqrt_r1 = sqrt(r1)
- sqrt_r2 = sqrt(r2)
-
- x = (1 - sqrt_r1) * x0 + sqrt_r1 * (1 - r2) * x1 + sqrt_r1 * r2 * x2
- y = (1 - sqrt_r1) * y0 + sqrt_r1 * (1 - r2) * y1 + sqrt_r1 * r2 * y2
- if decimals is not None:
- x = round(x, decimals)
- y = round(y, decimals)
- pt = ogr.Geometry(ogr.wkbPoint)
- pt.AddPoint_2D(x, y)
- if pt.Within(triangulation):
- return x, y
- # might need to retry due to rounding
- raise Exception(f"Couldn't find point within geometry pt={pt} triangle={triangle} k={k} triangulation={triangulation}")
-
-# Return a feature along with its "signature"
-def mkFeature(feature, layername):
- properties = feature.get('properties', None)
- if properties is None:
- raise Exception(f'Feature lacks "properties" attributes')
- fid = feature.pop('id', None)
- if fid is not None and layername is not None and not fid.startswith(layername.removeprefix('MRR:')):
- raise Exception(f'Unexpected ID {fid} for layer {layername}')
-
- properties2 = properties.copy()
- properties2.pop(LAST_UPDATED, None) # always ignore "Last updated" values
- properties2 = json.dumps(properties2, ensure_ascii=False, sort_keys=True, separators=(',', ':'))
- fpr = sha256(properties2.encode('utf-8') + b'\x00' + layername.encode('utf-8'))
- return fpr.hexdigest(), {
- # RFC 7946 sec. 3.2
- 'type': 'Feature',
- 'geometry': feature.get('geometry'),
- 'properties': properties,
- }
-
-FLOAT_RE = r'(?:[0-9]+(?:\.[0-9]*)?|\.[0-9]+)'
-PATH_RE = re.compile(r'^M\s*' +
- FLOAT_RE + r'\s+' + FLOAT_RE + r'\s+' +
- r'(?:L\s*' + FLOAT_RE + r'\s+' + FLOAT_RE + r'\s+)+' +
- r'(?:[ZM]|$)')
-POINT0_RE = re.compile(r'^M\s*(' + FLOAT_RE + r')\s+(' + FLOAT_RE + r')\s+')
-POINT_RE = re.compile(r'^L\s*(' + FLOAT_RE + r')\s+(' + FLOAT_RE + r')(?:\s+|$)')
-
-def download(dest, dl, dir_fd=None, headers={}, session=requests, progress=None):
- dest_path = Path(dest)
- if dest_path.suffix.lower() != '.geojson':
- # mostly to check that nothing ends in .svg
- raise Exception(f'{dest} must end with .geojson')
- dest_tmp = str(dest_path.with_stem(f'.{dest_path.stem}.new'))
- try:
- # delete any leftover
- os.unlink(dest_tmp, dir_fd=dir_fd)
- except FileNotFoundError:
- pass
-
- layername = dl['layername']
- logging.info(f'Processing layer %s…', layername)
- headers.pop('If-Modified-Since', None) # never send If-Modified-Since headers
-
- start = time_monotonic()
- wms = WMS(
- url='https://maps3.sgu.se/geoserver/wms',
- version='1.1.1',
- headers=headers,
- session=session,
- )
-
- # download the SVG from the WMS server and try to parse it to list
- # all features; an alternative would be to download a PNG and probe
- # every single non-transparent pixel, but it's very slow (and not
- # kind to the WMS server) due to the sheer amount of such pixels
- bbox = common.config['extent']
- res = 256
- image_format = 'image/svg+xml'
- r = wms.req('GetMap', content_type=image_format, params={
- 'LAYERS': layername,
- 'FORMAT': image_format,
- 'TRANSPARENT': 'true',
- 'TILED': 'false',
- 'WIDTH': f'{(bbox[2]-bbox[0])/res:.0f}',
- 'HEIGHT': f'{(bbox[3]-bbox[1])/res:.0f}',
- 'BBOX': ','.join(map(str, bbox)),
- 'SRS': common.config['SRS'],
- })
-
- svg_path = str(dest_path.with_suffix('.svg'))
- svg_fd = os.open(svg_path, O_RDWR|O_CREAT|O_CLOEXEC, mode=0o644, dir_fd=dir_fd)
- try:
- with os.fdopen(svg_fd, mode='wb+', closefd=False) as fp:
- if wms.is_uptodate(layername, dest, dir_fd=dir_fd,
- oldsvg_fp=fp, newsvg=r.content):
- logging.info('%s appears to be up to date, skipping', dest)
- wms.req_count = None # don't print request count
- return
-
- # not atomic, but not a big deal (only used for comparison and mismatch
- # causes the layer to be downloaded again); it's not possible to write
- # the .geojson and the .svg in the same transaction anyway
- fp.truncate()
- fp.write(r.content)
- finally:
- os.close(svg_fd)
-
- # https://gdal.org/tutorials/geotransforms_tut.html
- gt = [bbox[0], res, 0, bbox[3], 0, -res]
-
- ns = 'http://www.w3.org/2000/svg'
- tree = etree.XML(r.content)
-
- # parse SVG paths (approximation):
- # https://developer.mozilla.org/en-US/docs/Web/SVG/Tutorial/Paths#line_commands
- path_strings = set()
- for elem in tree.iterfind(f'.//{{{ns}}}path'):
- if elem.getparent().tag == f'{{{ns}}}clipPath':
- continue
- d = elem.attrib.get('d')
- if d is None:
- logging.error('<path> element has no "d" attribute')
- continue
- m = PATH_RE.match(d)
- if m is None:
- logging.error("Can't parse <path>'s \"d\" attribute value: \"%s\"", d)
- continue
- path_strings.add(d)
- logging.info('Found %d unique SVG path(s)', len(path_strings))
-
- features, crs = wms.getFeatures(layername, path_strings, gt, progress=progress)
-
- # refine geometries
- if len(features) > 0:
- #resolutions = [128, 16, 2, .25]
- resolutions = [16, .25]
- for res in resolutions:
- wms.refineGeometries(features, layername, crs=crs, res=res, progress=progress)
- logging.info('Found %d unique feature(s) after refinement', len(features))
-
- features = {
- 'type': 'FeatureCollection',
- 'name': layername,
- 'crs': crs,
- 'features': list(features.values()),
- }
-
- # write the destination GeoJSON file
- fd = os.open(os.path.dirname(dest), O_WRONLY|O_CLOEXEC|O_TMPFILE, mode=0o644, dir_fd=dir_fd)
- try:
- with os.fdopen(fd, mode='w', closefd=False) as fp:
- json.dump(features, fp, ensure_ascii=False, separators=(',', ':'))
- st = os.fstat(fd)
- size = st.st_size
- feature_count = len(features['features'])
-
- # XXX unfortunately there is no way for linkat() to clobber the destination,
- # so we use a temporary file; it's racy, but thanks to O_TMPFILE better
- # (shorter race) than if we were dumping chunks in a named file descriptor
- os.link(f'/proc/self/fd/{fd}', dest_tmp, dst_dir_fd=dir_fd, follow_symlinks=True)
- finally:
- os.close(fd)
-
- try:
- # atomic rename (ensures output is never partially written)
- os.rename(dest_tmp, dest, src_dir_fd=dir_fd, dst_dir_fd=dir_fd)
- except (OSError, ValueError) as e:
- try:
- os.unlink(dest_tmp, dir_fd=dir_fd)
- finally:
- raise e
-
- elapsed = time_monotonic() - start
- wms.print_req_count(elapsed=elapsed)
- wms.req_count = None
- logging.info('%s: Fetched %d features (%s) in %s (%.2f\u202Ffeat/s, %s/s)', dest,
- feature_count, common.format_bytes(size),
- common.format_time(elapsed),
- feature_count/elapsed,
- common.format_bytes(int(size/elapsed)))