diff options
author | Guilhem Moulin <guilhem@fripost.org> | 2025-05-20 11:57:55 +0200 |
---|---|---|
committer | Guilhem Moulin <guilhem@fripost.org> | 2025-05-21 18:57:50 +0200 |
commit | 9b192ebb312741672babd46523d8558b6e74ff9a (patch) | |
tree | 7af98a5c5526aaf486416c818c462192e16bc6ef | |
parent | d3df2c2ef8d253ffa3365d0eec326bb611b9b7e2 (diff) |
webmap-import: Add option to generate Mapbox Vector Tiles (MVT).
-rw-r--r-- | config.yml | 5 | ||||
-rw-r--r-- | export_mvt.py | 474 | ||||
-rw-r--r-- | import_source.py | 4 | ||||
-rw-r--r-- | rename_exchange.py | 48 | ||||
-rwxr-xr-x | webmap-import | 64 | ||||
-rwxr-xr-x | webmap-publish | 775 |
6 files changed, 578 insertions, 792 deletions
@@ -103,11 +103,6 @@ dataset: USER: webmap_import DBNAME: webmap - # Optional driver-specific dataset open options which overrides the - # above 'open-options' settings when publishing. - open-options-publish: - USER: webmap_guest - layercache: public.layercache # Optional dictionary of default layer creation options, cf. diff --git a/export_mvt.py b/export_mvt.py new file mode 100644 index 0000000..e9b54f4 --- /dev/null +++ b/export_mvt.py @@ -0,0 +1,474 @@ +#!/usr/bin/python3 + +#---------------------------------------------------------------------- +# Backend utilities for the Klimatanalys Norr project (create MVT tiles) +# Copyright © 2024-2025 Guilhem Moulin <info@guilhem.se> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +#---------------------------------------------------------------------- + +# pylint: disable=invalid-name, missing-module-docstring, fixme + +from os import O_RDONLY, O_WRONLY, O_CREAT, O_EXCL, O_CLOEXEC, O_DIRECTORY, F_OK +import os +from errno import EAGAIN +from fcntl import flock, LOCK_EX +import json +import logging +from pathlib import Path +import shutil +import tempfile +from typing import Any, Iterator, Optional +from time import monotonic as time_monotonic + +import brotli +from osgeo import gdal, ogr, osr + +from common import BadConfiguration, escape_identifier, format_bytes, format_time +from common_gdal import getExtent, getSRS, getSpatialFilterFromGeometry +from rename_exchange import rename_exchange + +def parseTilingScheme(scheme : list[Any]) -> tuple[osr.SpatialReference, ogr.Geometry|None]: + """Get SRS end extent from MVT tiling scheme (crs, tile_origin_upper_left_x, + tile_origin_upper_left_y, tile_dimension_zoom_0).""" + if scheme is None: + srs = osr.SpatialReference() + # standard WebMercator tiling scheme + srs.ImportFromEPSG(3857) + return srs, None + if not isinstance(scheme, list) or len(scheme) != 4: + raise BadConfiguration(f'Invalid tiling scheme: {scheme}') + srs = getSRS(scheme[0]) + if not srs.IsProjected(): + raise RuntimeError('SRS is not projected: ' + srs.ExportToPrettyWkt()) + + minX = minY = maxX = maxY = None + for i in range(srs.GetAxesCount()): + orientation = srs.GetAxisOrientation('PROJCS', i) + if orientation == osr.OAO_North: + assert minY is None and maxY is None + maxY = scheme[2] + minY = maxY - scheme[3] + elif orientation == osr.OAO_South: + assert minY is None and maxY is None + minY = scheme[2] + maxY = minY + scheme[3] + elif orientation == osr.OAO_East: + assert minX is None and maxX is None + minX = scheme[1] + maxX = minX + scheme[3] + elif orientation == osr.OAO_West: + assert minX is None and maxX is None + maxX = scheme[1] + minX = maxX - scheme[3] + if minX is None or minY is None or maxX is None or maxY is None: + raise RuntimeError('Unknown axes meaning for ' + srs.ExportToPrettyWkt()) + extent = (minX, minY, maxX, maxY) + logging.debug('Parsed tiling scheme: extent=%s, srs="%s"', str(extent), srs.GetName()) + return srs, getExtent(extent, srs=srs) + +# pylint: disable-next=dangerous-default-value +def createMVT(drv : gdal.Driver, path : str, + options : dict[str,str|int|float|None] = {}, + default_options : dict[str,Any]|None = None) -> gdal.Dataset: + """Create and return the MVT file/directory (which must not exist).""" + # merge options from the config file + default_map = { + 'min-zoom': 'MINZOOM', + 'max-zoom': 'MAXZOOM', + 'max-size': 'MAX_SIZE', + 'max-features': 'MAX_FEATURES', + 'simplification': 'SIMPLIFICATION', + 'simplification-max-zoom': 'SIMPLIFICATION_MAX_ZOOM', + 'tiling-scheme': 'TILING_SCHEME', + 'extent': 'EXTENT', + } + if default_options is not None: + for k, v in default_options.items(): + if k not in default_map: + logging.warning('No default map for key "%s", ignoring', k) + continue + if options is None: + options = {} + k = default_map[k] + if k not in options: + if isinstance(v, list): + v = ','.join(map(str, v)) + options[k] = v + + kwargs = { 'eType': gdal.GDT_Unknown } + if options is not None: + kwargs['options'] = opts = [] + for k, v in options.items(): + opts.append(k + '=' + str(v)) + + logging.debug('Create(%s, %s, eType=%s%s)', drv.ShortName, path, kwargs['eType'], + ', options=' + str(kwargs['options']) if 'options' in kwargs else '') + return drv.Create(path, 0, 0, 0, **kwargs) + +# pylint: disable-next=too-many-branches +def exportSourceLayer(ds_src : gdal.Dataset, + lyr_src : ogr.Layer, + lyr_dst : ogr.Layer, + layerdef : dict[str,Any], + extent : ogr.Geometry|None = None) -> int: + """Export a source layer.""" + count0 = -1 + if lyr_src.TestCapability(ogr.OLCFastFeatureCount): + count0 = lyr_src.GetFeatureCount(force=0) + + layername = lyr_src.GetName() + srs_src = lyr_src.GetSpatialRef() + if srs_src is None: + raise RuntimeError(f'Source layer "{layername}" has no SRS') + srs_dst = lyr_dst.GetSpatialRef() + if srs_dst is None: + raise RuntimeError(f'Destination layer "{lyr_dst.GetName()}" has no SRS') + + if srs_dst.IsSame(srs_src): + logging.debug('Source and destination have the same SRS (%s), ' + 'skipping coordinate transformation', + srs_dst.GetName()) + ct = None + else: + # TODO Use GetSupportedSRSList() and SetActiveSRS() with GDAL ≥3.7.0 + # when possible, see apps/ogr2ogr_lib.cpp + # pylint: disable=duplicate-code + logging.debug('Creating transforming from source SRS (%s) to destination SRS (%s)', + srs_src.GetName(), srs_dst.GetName()) + ct = osr.CoordinateTransformation(srs_src, srs_dst) + if ct is None: + raise RuntimeError('Could not create transformation from source SRS ' + f'({srs_src.GetName()}) to destination SRS ({srs_dst.GetName()})') + + # get geometry field of the source layer + defn = lyr_src.GetLayerDefn() + geomFieldCount = defn.GetGeomFieldCount() + if geomFieldCount != 1: + logging.warning('Source layer "%s" has %d != 1 geometry fields', + layername, geomFieldCount) + if geomFieldCount == 0: + return 0 + geomField = defn.GetGeomFieldDefn(0) + + if extent is None: + spatialFilter = None + clip = None + else: + # transform extent to source SRS + spatialFilter = getSpatialFilterFromGeometry(extent, srs_src) + # ensure clipping geometries remain simple (mere rectangles) + clip = getSpatialFilterFromGeometry(extent, srs_dst, fail_if_srs_not_equivalent = True) + + transform_geometry = layerdef.get('transform-geometry', None) + columns = [ 'm.' + escape_identifier(lyr_src.GetFIDColumn()) ] + geomFieldName_esc = escape_identifier(geomField.GetName()) + if transform_geometry is None: + columns.append('m.' + geomFieldName_esc) + elif transform_geometry == 'centroid': + columns.append('ST_Centroid(m.' + geomFieldName_esc + ') AS ' + geomFieldName_esc) + else: + raise BadConfiguration(f'Unsupported geometry transformation: {transform_geometry}') + + query = 'SELECT ' + ', '.join(columns) + ' FROM ' + escape_identifier(layername) + ' m' + + cond = layerdef.get('where', None) + if cond is not None: + query += ' WHERE ' + cond.strip() + + logging.debug('ExecuteSQL(%s%s)', query, + '' if spatialFilter is None else ', spatialFilter=' + spatialFilter.ExportToWkt()) + lyr_src = ds_src.ExecuteSQL(query, spatialFilter=spatialFilter) + try: + count1 = -1 + if lyr_src.TestCapability(ogr.OLCFastFeatureCount): + count1 = lyr_src.GetFeatureCount(force=0) + if count0 >= 0 and count1 >= 0: + logging.debug('Source layer "%s" has %d features, of which %d are to be exported', + layername, count0, count1) + + defn_dst = lyr_dst.GetLayerDefn() + eGType_dst = defn_dst.GetGeomType() + dGeomIsUnknown = ogr.GT_Flatten(eGType_dst) == ogr.wkbUnknown + + count = count_degenerates = 0 + feature = lyr_src.GetNextFeature() + while feature is not None: + geom = feature.GetGeometryRef().Clone() + if ct is not None and geom.Transform(ct) != ogr.OGRERR_NONE: + raise RuntimeError('Could not apply coordinate transformation') + geom.FlattenTo2D() + if clip is not None: + # GDAL generates tiles beyond the tiling scheme extents so we need to + # clip manually + dim = geom.GetDimension() + geom = geom.Intersection(clip) + if geom.GetDimension() != dim: + # degenerate intersection, skip feature + count_degenerates += 1 + feature = lyr_src.GetNextFeature() + continue + if not dGeomIsUnknown: + geom = ogr.ForceTo(geom, eGType_dst) + + feature2 = ogr.Feature(defn_dst) + feature2.SetGeometryDirectly(geom) + feature2.SetFID(feature.GetFID()) + if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE: + raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}') + count += 1 + feature = lyr_src.GetNextFeature() + + logging.info('Exported %d features (%d degenerate skipped) to MVT layer "%s" from "%s"', + count, count_degenerates, lyr_dst.GetName(), layername) + + finally: + ds_src.ReleaseResultSet(lyr_src) + lyr_src = None + return count + +def list_files(top : str, + dir_fd : Optional[int] = None, + follow_symlinks : bool = False, + ext : Optional[str] = None) -> Iterator[tuple[str,int]]: + """Generator of filenames and dir_fds.""" + for _dirname, _, filenames, dir_fd2 in os.fwalk(top=top, + dir_fd=dir_fd, + follow_symlinks=follow_symlinks): + for filename in filenames: + if ext is None or filename.lower().endswith(ext): + yield filename, dir_fd2 + +def write_all(fd : int, buf : bytes) -> int: + """Wrapper around os.write() that loops until the entire data is written.""" + offset = 0 + while offset < len(buf): + try: + n = os.write(fd, buf[offset:]) + except IOError as e: + if e.errno != EAGAIN: + raise e + #logging.debug('Got EAGAIN while trying to write') + else: + #logging.debug('Wrote %d bytes of %d', n, len(buf) - offset) + offset += n + return offset + +def compress_brotli(path : str, + suffix : str = '.br', + dir_fd : Optional[int] = None, + # TODO increase chunksize to see if helps with performance + chunksize : int = 65536, + oFlags : int = 0, + **kwargs) -> tuple[int, int]: + """Compress a file to Brotli, returning a pair of original and compressed size.""" + compressor = brotli.Compressor(**kwargs) + fd_in = os.open(path, O_RDONLY|O_CLOEXEC, dir_fd=dir_fd) + size_in = size_out = 0 + try: + # hard-code the mode to save an fstat(2) call + fd_out = os.open(path + suffix, O_WRONLY|O_CREAT|O_CLOEXEC|oFlags, + mode=0o0644, dir_fd=dir_fd) + try: + buf_in = os.read(fd_in, chunksize) + while len(buf_in) > 0: + #logging.debug('Read %d bytes of %d', len(buf_in), chunksize) + size_in += len(buf_in) + size_out += write_all(fd_out, compressor.process(buf_in)) + buf_in = os.read(fd_in, chunksize) + size_out += write_all(fd_out, compressor.finish()) + finally: + os.close(fd_out) + finally: + os.close(fd_in) + return size_in, size_out + +# pylint: disable-next=too-many-branches, too-many-statements +def exportMVT(ds : gdal.Dataset, + layers : dict[str,dict[str,Any]], + tmpdir : Path|None, + dst : Path, + mvtname : str = 'mvt', + drvname : str = 'MVT', + default_options : dict[str,Any]|None = None, + tile_extension : str = '.pbf', + compress : bool = False) -> None: + """Export some layers to MVT.""" + drv = gdal.GetDriverByName(drvname) + if drv is None: + raise RuntimeError(f'Unknown GDAL driver for format "{drvname}"') + + srs, extent = parseTilingScheme(default_options.get('tiling-scheme', None)) + + export_layers = {} + mvtconf = {} + for layername, layerdef in layers.items(): + exportdef = layerdef.get('publish', None) + if exportdef is None: + raise RuntimeError(f'Layer "{layername}" has no publication definition') + if isinstance(exportdef, str): + exportdef = { exportdef:{} } + elif isinstance(exportdef, list): + exportdef = { l:{} for l in exportdef } + for export_layername, export_layerdef in exportdef.items(): + if export_layername in export_layers: + raise RuntimeError(f'Duplicate definition for {export_layername}') + x = {} + for k in ('target_name', 'minzoom', 'maxzoom'): + if k in export_layerdef: + x[k] = export_layerdef[k] + mvtconf[export_layername] = x + export_layers[export_layername] = (layername, export_layerdef) + + if tmpdir is None: + # use a temporary directory in the parent to make sure we can + # atomically rename/exchange directories + tmpdir2 = tempfile.mkdtemp(prefix='.tmp.mvt-', dir=str(dst.parent)) + logging.debug('Using "%s" as temporary directory for MVT', tmpdir2) + else: + # assume it already exists (it's shared by all invocation of the program) + tmpdir2 = str(tmpdir) + + dir_fd = os.open(tmpdir2, O_RDONLY|O_CLOEXEC|O_DIRECTORY) + try: + if tmpdir is not None: + logging.debug('flock("%s", LOCK_EX)', tmpdir2) + flock(dir_fd, LOCK_EX) + with os.scandir(dir_fd) as it: + if next(it, None) is not None: + logging.warning('Temporary directory "%s" exists and is not empty', + str(tmpdir)) + + # createMVT() crashes if mvtname exists so that's also what we want here + tmpname = '.' + mvtname + '-tmp' + os.mkdir(tmpname, mode=0o700, dir_fd=dir_fd) + + start = time_monotonic() + basedir = Path(f'/proc/self/fd/{dir_fd}') + # gdal.create() raises an exception when path exists, and that's what we want + dso = createMVT(drv, path=str(basedir.joinpath(mvtname)), + default_options=default_options, + options = { + 'FORMAT': 'DIRECTORY', + # OpenLayers doesn't support compressed tiles, so instead + # we use brotli_static to let the browser transparently + # uncompress the responses + 'COMPRESS': 'NO', + 'TYPE': 'overlay', + 'BUFFER': 32, + 'TILE_EXTENSION': tile_extension.removeprefix('.'), + 'TEMPORARY_DB': str(basedir.joinpath(tmpname).joinpath('tmp-mvt.db')), + 'CONF': json.dumps(mvtconf, ensure_ascii=False, separators=(',',':')), + }) + + layer_count = feature_count = 0 + for layername, (layername_src, layerdef) in export_layers.items(): + # create destination layer + lyr_src = ds.GetLayerByName(layername_src) + if lyr_src is None: + raise RuntimeError(f'Source dataset has no layer named "{layername_src}"') + transform_geometry = layerdef.get('transform-geometry', None) + if transform_geometry is None: + geom_type = ogr.GT_Flatten(lyr_src.GetGeomType()) + elif transform_geometry == 'centroid': + geom_type = ogr.wkbPoint + else: + raise BadConfiguration(f'Unsupported geometry transformation: {transform_geometry}') + logging.debug('Creating destination MVT layer "%s" with SRS %s and type %s', + layername, srs.GetName(), ogr.GeometryTypeToName(geom_type)) + lyr_dst = dso.CreateLayer(layername, srs=srs, geom_type=geom_type) + if lyr_dst is None: + raise RuntimeError(f'Could not create destination layer "{layername}"') + + # TODO: GDAL exports features to a temporary SQLite database even though the source + # is PostGIS hence is able to generate MVT with ST_AsMVT(). Letting PostGIS generate + # tiles is likely to speed up things. + feature_count += exportSourceLayer(ds, lyr_src, lyr_dst, layerdef, extent=extent) + layer_count += 1 + lyr_dst = None + lyr_src = None + + dso = None + start2 = time_monotonic() + logging.info('Exported %d features to %d MVT layers in %s', + feature_count, layer_count, format_time(start2 - start)) + + tile_count = size_tot = size_tot_z = 0 + size_min = size_max = size_min_z = size_max_z = None + for filename, dir_fd2 in list_files(top=mvtname, dir_fd=dir_fd, + follow_symlinks=False, + ext=tile_extension): + tile_count += 1 + if not compress: + szi = os.stat(filename, dir_fd=dir_fd2).st_size + else: + # TODO: benchmark brotli quality to trade-off compression ratio vs. execution time + szi, szo = compress_brotli(filename, + dir_fd=dir_fd2, + oFlags=O_EXCL, + mode=brotli.MODE_GENERIC, + quality=5) + size_tot_z += szo + if size_min_z is None or szo < size_min_z: + size_min_z = szo + if size_max_z is None or size_max_z < szo: + size_max_z = szo + size_tot += szi + if size_min is None or szi < size_min: + size_min = szi + if size_max is None or size_max < szi: + size_max = szi + + if tile_count > 0: + logging.info('Tile count: %d [min=%s, max=%s, sum=%s, avg=%s]', + tile_count, + format_bytes(size_min), format_bytes(size_max), + format_bytes(size_tot), format_bytes(round(size_tot/tile_count))) + if compress: + logging.info('Compression ratio: %.1f%% in %s [min=%s, max=%s, sum=%s, avg=%s]', + 100. * (1. - size_tot_z/size_tot), + format_time(time_monotonic() - start2), + format_bytes(size_min_z), format_bytes(size_max_z), + format_bytes(size_tot_z), format_bytes(round(size_tot_z/tile_count))) + + try: + # OpenLayers doesn't make use of that file so delete it + os.unlink(str(Path(mvtname).joinpath('metadata.json')), dir_fd=dir_fd) + except FileNotFoundError: + pass + + try: + # atomically exchange paths + rename_exchange(mvtname, dst, olddirfd=dir_fd) + except FileNotFoundError: + # dst doesn't exist, use normal os.rename() instead + os.rename(mvtname, dst, src_dir_fd=dir_fd) + + finally: + lyr_dst = None + dso = None + srs = None + for p in (tmpname, mvtname): + if os.access(p, F_OK, dir_fd=dir_fd, follow_symlinks=False): + logging.debug('rmtree("%s/%s")', tmpdir2, p) + shutil.rmtree(p, dir_fd=dir_fd) + + try: + os.close(dir_fd) + except (OSError, ValueError): + logging.exception('Could not close directory') + + if tmpdir is None: + logging.debug('rmdir("%s")', tmpdir2) + os.rmdir(tmpdir2) diff --git a/import_source.py b/import_source.py index 04dea4b..46ec1e9 100644 --- a/import_source.py +++ b/import_source.py @@ -449,6 +449,9 @@ class ImportStatus(Enum): IMPORT_ERROR = 1 IMPORT_NOCHANGE = 255 + def __str__(self): + return self.name.removeprefix('IMPORT_') + # pylint: disable-next=dangerous-default-value def importSources(dso : gdal.Dataset, lyr : ogr.Layer, sources : dict[str,Any] = {}, @@ -651,6 +654,7 @@ def _importSource2(lyr_dst : ogr.Layer, path : str, args : dict[str,Any], else: # TODO Use GetSupportedSRSList() and SetActiveSRS() with GDAL ≥3.7.0 # when possible, see apps/ogr2ogr_lib.cpp + # pylint: disable=duplicate-code logging.debug('Creating transforming from source SRS (%s) to destination SRS (%s)', srs.GetName(), srs_dst.GetName()) ct = osr.CoordinateTransformation(srs, srs_dst) diff --git a/rename_exchange.py b/rename_exchange.py new file mode 100644 index 0000000..b1b02cb --- /dev/null +++ b/rename_exchange.py @@ -0,0 +1,48 @@ +# © 2022 Nicko van Someren +# License: MIT + +# pylint: disable=missing-module-docstring + +from ctypes import CDLL, get_errno +from os import strerror, PathLike, fsencode +from platform import machine +from typing import Optional + +SYSCALL_ARCH_MAP = { + 'x86_64': 316, + 'armv6l': 382, + 'armv7l': 382, + 'aarch64': 276, + 'i386': 353, +} + +libc = CDLL('libc.so.6', use_errno=True) +syscall_function = libc.syscall +ARCH = machine() +RENAME_EXCHANGE = 2 +AT_FDCWD = -100 + +if ARCH not in SYSCALL_ARCH_MAP: + raise NotImplementedError(f'Unsupported architecture: {ARCH}') + +SYSCALL_RENAMEAT2 = SYSCALL_ARCH_MAP[ARCH] + +def rename_exchange(oldpath : PathLike, + newpath : PathLike, + olddirfd : Optional[int] = None, + newdirfd : Optional[int] = None) -> None: + """Atomically exchange oldpath and newpath. Both pathnames must + exist but may be of different types (e.g., one could be a non-empty + directory and the other a symbolic link).""" + + result = syscall_function( + SYSCALL_RENAMEAT2, + olddirfd if olddirfd is not None else AT_FDCWD, + fsencode(oldpath), + newdirfd if newdirfd is not None else AT_FDCWD, + fsencode(newpath), + RENAME_EXCHANGE, + ) + if result != 0: + errno = get_errno() + raise OSError(errno, strerror(errno)) diff --git a/webmap-import b/webmap-import index dafff78..1b9cde8 100755 --- a/webmap-import +++ b/webmap-import @@ -31,6 +31,7 @@ import re from datetime import datetime, timedelta, timezone, UTC from math import modf from pathlib import Path +from time import monotonic as time_monotonic from typing import Any, Optional, NoReturn import traceback @@ -67,6 +68,7 @@ from import_source import ( importSources, ImportStatus ) +from export_mvt import exportMVT def setFieldIf(cond : bool, attrName : str, @@ -580,6 +582,13 @@ def main() -> NoReturn: help='obtain an exclusive lock before processing') parser.add_argument('--lockdir-sources', default=None, help='optional directory for lock files to source paths') + parser.add_argument('--mvtdir', default=None, + help='optional directory for Mapbox Vector Tiles (MVT)') + parser.add_argument('--mvtdir-tmp', default=None, + help='temporary directory for Mapbox Vector Tiles (MVT); it must exists and be ' + 'on the same file system as the --mvtdir value') + parser.add_argument('--compress-tiles', default=False, action='store_true', + help='whether to compress Mapbox Vector Tiles (MVT) files') parser.add_argument('--force', default=False, action='store_true', help='import even if no new changes were detected') parser.add_argument('groupname', nargs='*', help='group layer name(s) to process') @@ -634,7 +643,16 @@ def main() -> NoReturn: 'support layer creation') createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None)) - cachedir = Path(args.cachedir) if args.cachedir is not None else None + if args.mvtdir is not None: + args.mvtdir = Path(args.mvtdir) + if args.mvtdir == Path(): + raise RuntimeError('Invalid value for --mvtdir') + args.mvtdir.parent.mkdir(parents=True, exist_ok=True) + if args.mvtdir_tmp is not None: + args.mvtdir_tmp = Path(args.mvtdir_tmp) + + if args.cachedir is not None: + args.cachedir = Path(args.cachedir) if args.lockdir_sources is None: sourcePathLocks = None else: @@ -665,15 +683,19 @@ def main() -> NoReturn: rv = 0 try: - r = ImportStatus.IMPORT_NOCHANGE + r = {} + n = 0 + start = time_monotonic() for layername, layerdef in layers.items(): - r0 = processOutputLayer(dso, layername, layerdef, - srs=srs, - cachedir=cachedir, - extent=extent, - dsTransaction=dsoTransaction, - lyrcache=lyr_cache, - force=args.force) + r[layername] = r0 = processOutputLayer(dso, layername, layerdef, + srs=srs, + cachedir=args.cachedir, + extent=extent, + dsTransaction=dsoTransaction, + lyrcache=lyr_cache, + force=args.force) + n += 1 + logging.info('Import result status for layer "%s": %s', layername, str(r0)) if r0 == ImportStatus.IMPORT_ERROR: rv = 1 if dsoTransaction: @@ -682,13 +704,31 @@ def main() -> NoReturn: # no need to catch the exception here if dso.CommitTransaction() != ogr.OGRERR_NONE: logging.error('Could not rollback transaction') - if r == ImportStatus.IMPORT_NOCHANGE and r0 == ImportStatus.IMPORT_SUCCESS: - r = ImportStatus.IMPORT_SUCCESS - logging.info('result = %s', str(r)) + break + elapsed = time_monotonic() - start + logging.info('Processed %d destination layers in %s', n, common.format_time(elapsed)) if sourcePathLocks is not None: releaseSourcePathLocks(sourcePathLocks) + export_layers = { l:d for l,d in layers.items() if d.get('publish', None) is not None } + if args.mvtdir is None or any(r0 == ImportStatus.IMPORT_ERROR for r0 in r.values()): + pass + elif len(export_layers) == 0: + logging.warning('--mvtdir option used but no layer has a publication definition') + elif (all(r0 == ImportStatus.IMPORT_NOCHANGE for l,r0 in r.items() if l in export_layers) + and args.mvtdir.is_dir()): + logging.info('Skipping MVT export for group %s (no changes)', + ', '.join(args.groupname) if args.groupname is not None else '*') + else: + exportMVT(dso, layers=export_layers, + tmpdir=args.mvtdir_tmp, + dst=args.mvtdir, + default_options=config.get('vector-tiles', None), + mvtname=(args.groupname[0] if args.groupname is not None and + len(args.groupname) == 1 else 'mvt'), + compress=args.compress_tiles) + if dsoTransaction: dsoTransaction = False logging.debug('Committing transaction') diff --git a/webmap-publish b/webmap-publish deleted file mode 100755 index 8db920b..0000000 --- a/webmap-publish +++ /dev/null @@ -1,775 +0,0 @@ -#!/usr/bin/python3 - -#---------------------------------------------------------------------- -# Backend utilities for the Klimatanalys Norr project (create MVT tiles) -# Copyright © 2024 Guilhem Moulin <info@guilhem.se> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <https://www.gnu.org/licenses/>. -#---------------------------------------------------------------------- - -from os import O_RDONLY, O_WRONLY, O_CREAT, O_EXCL, O_TRUNC, O_CLOEXEC, O_DIRECTORY -import os -import errno -from fcntl import flock, LOCK_EX, LOCK_UN -from string import hexdigits -import json -import logging -import math -import random -import argparse -import tempfile -from pathlib import Path -import numpy - -from osgeo import gdal, ogr, osr -from osgeo.gdalconst import ( - OF_VECTOR as GDAL_OF_VECTOR, - OF_READONLY as GDAL_OF_READONLY, - OF_UPDATE as GDAL_OF_UPDATE, - OF_VERBOSE_ERROR as GDAL_OF_VERBOSE_ERROR, -) -gdal.UseExceptions() - -import common -from common import ( - format_bytes, - gdalSetOpenExArgs, - escape_identifier -) - -# Open and return source dataset. -def openSourceDS(def_dict): - # merge 'open-options-publish' into 'open-options' - opts2 = def_dict.get('open-options-publish', None) - if opts2 is not None: - if 'open-options' not in def_dict: - def_dict['open-options'] = {} - def_dict['open-options'] |= opts2 - kwargs, _ = gdalSetOpenExArgs(gdal, def_dict, flags=GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR) - - path = def_dict['path'] - logging.debug('OpenEx(%s, %s)', path, str(kwargs)) - return gdal.OpenEx(path, **kwargs) - -# Choose a random subdirectory under the given base directory and create -# a new MVT dataset there. -# The old MVT dataset (if any) remains, which is intentional we don't -# want to disrupt browser caches and tile → attribute mapping. -def createMVT(basedir, options=None): - drv = gdal.GetDriverByName('MVT') - if drv is None: - raise Exception('Unknown GDAL driver for format "MVT"') - - # choose an available name at random from 00 to FF - weights = [1] * 256 - for path in basedir.iterdir(): - filename = path.name - if len(filename) == 2 and filename[0] in hexdigits and filename[1] in hexdigits: - n = int(filename, 16) - weights[n] = 0 - if all([w == 0 for w in weights]): - raise Exception(f'No name available in {args.destdir} (forgot to cleanup?)') - - name = random.choices(range(256), weights=weights, k=1)[0] - name = f'{name:0>2x}' - path = str(basedir.joinpath(name)) - - # merge options from the config file - defaults_map = { - 'min-zoom': 'MINZOOM', - 'max-zoom': 'MAXZOOM', - 'max-size': 'MAX_SIZE', - 'max-features': 'MAX_FEATURES', - 'simplification': 'SIMPLIFICATION', - 'simplification-max-zoom': 'SIMPLIFICATION_MAX_ZOOM', - 'tiling-scheme': 'TILING_SCHEME', - 'extent': 'EXTENT', - } - defaults_options = config.get('vector-tiles', None) - if defaults_options is not None: - for k, v in defaults_options.items(): - if k not in defaults_map: - logging.warning('No default map for key "%s", ignoring', k) - continue - if options is None: - options = {} - k = defaults_map[k] - if k not in options: - if isinstance(v, list): - v = ','.join(map(str, v)) - options[k] = v - - kwargs = { 'eType': gdal.GDT_Unknown } - if options is not None: - kwargs['options'] = opts = [] - for k, v in options.items(): - opts.append(k + '=' + str(v)) - - logging.debug('Create(%s, %s, eType=%s%s)', drv.ShortName, path, kwargs['eType'], - ', options=' + str(kwargs['options']) if 'options' in kwargs else '') - ds = drv.Create(path, 0, 0, 0, **kwargs) - return ds, name - -# Filter a source layer. -def getSourceLayer(ds_src, layerdef, extent=None): - layername = layerdef['__layername-src__'] - lyr_src = ds_src.GetLayerByName(layername) - if lyr_src is None: - raise Exception(f'Source dataset has no layer named "{layername}"') - - count0 = -1 - if lyr_src.TestCapability(ogr.OLCFastFeatureCount): - count0 = lyr_src.GetFeatureCount(force=0) - - srs_src = lyr_src.GetSpatialRef() - if srs_src is None: - raise Exception('Source layer "{}" has no SRS'.format(layername)) - elif not srs_src.IsProjected(): - raise Exception('Source layer "{}" SRS is not projected: {}'.format( - layername, srs_src.ExportToPrettyWkt())) - # "the value to multiply by linear distances to transform them to meters" - linearUnits = srs_src.GetLinearUnits() - logging.debug('Source layer "%s" SRS ("%s") linear units: %f', layername, - srs_src.GetName(), linearUnits) - - # get geometry field of the source layer - defn = lyr_src.GetLayerDefn() - geomFieldCount = defn.GetGeomFieldCount() - if geomFieldCount != 1: - logging.warning('Source layer "%s" has %d != 1 geometry fields', lyr_src.GetName(), geomFieldCount) - geomField = defn.GetGeomFieldDefn(0) - geomType = geomField.GetType() - geomFieldName_esc = escape_identifier(geomField.GetName()) - - # transform extent to source SRS - if extent is None or extent.GetSpatialReference().IsSame(srs_src): - extent_src = extent - else: - extent_src = extent.Clone() - if extent_src.TransformTo(srs_src) != ogr.OGRERR_NONE: - raise Exception('Could not transform extent {} to {}'.format( - extent.ExportToWkt(), srs_src.GetName())) - - - reserved_fields = set(map(lambda x: x.lower(), [ - 'geom', - 'GeomArea', - 'GeomPerimeter', - 'GeomLength', - ])) - - # build SQL query - drv = ds_src.GetDriver() - columns = [] - for i in range(defn.GetFieldCount()): - fld = defn.GetFieldDefn(i) - fldType = fld.GetType() - if fld.GetName().lower() in reserved_fields: - raise Exception(f'Invalid column name "{fld.GetName()}"') - fldName_esc = escape_identifier(fld.GetName()) - column = 'm.' + fldName_esc - # for date/time/datetime fields, we let the RDBMS engine do the formatting if possible - if fldType == ogr.OFTDate: - if drv.ShortName == 'PostgreSQL': - column = 'to_char(' + column + ', \'YYYY-MM-DD\') AS ' - elif drv.ShortName == 'GPKG': - column = 'strftime(\'%F\', ' + column + ') AS ' - elif fldType == ogr.OFTTime: - if drv.ShortName == 'PostgreSQL': - column = 'to_char(' + column + ', \'HH24:MI:SSOF\') AS ' - elif drv.ShortName == 'GPKG': - # XXX SQLite's strftime() lacks support for time zones - column = 'strftime(\'%T\', ' + column + ') AS ' - elif fldType == ogr.OFTDateTime: - if drv.ShortName == 'PostgreSQL': - column = 'to_char(' + column + ', \'YYYY-MM-DD"T"HH24:MI:SSOF\') AS ' - elif drv.ShortName == 'GPKG': - # XXX SQLite's strftime() lacks support for time zones - column = 'strftime(\'%FT%T\', ' + column + ') AS ' - if column.endswith(' AS '): - column += fldName_esc - columns.append(column) - - # add columns with computed info about the geometry (before clipping and rounded to 0.01m²/0.01m) - if ogr.GT_IsSubClassOf(geomType, ogr.wkbSurface) or ogr.GT_IsSubClassOf(geomType, ogr.wkbMultiSurface): - columns.append('ROUND(CAST(ST_Area(m.' + geomFieldName_esc + ') * ' - + str(linearUnits) + '*' + str(linearUnits) + ' AS numeric), 2) AS "GeomArea"') - columns.append('ROUND(CAST(ST_Perimeter(m.' + geomFieldName_esc + ') * ' - + str(linearUnits) + ' AS numeric), 2) AS "GeomPerimeter"') - elif ogr.GT_IsSubClassOf(geomType, ogr.wkbCurve) or ogr.GT_IsSubClassOf(geomType, ogr.wkbMultiCurve): - columns.append('ROUND(CAST(ST_Length(m.' + geomFieldName_esc + ') * ' - + str(linearUnits) + ' AS numeric), 2) AS "GeomLength"') - - transform_geometry = layerdef.get('transform-geometry', None) - if transform_geometry is None: - columns.append('m.' + geomFieldName_esc) - elif transform_geometry == 'centroid': - columns.append('ST_Centroid(m.' + geomFieldName_esc + ') AS ' + geomFieldName_esc) - else: - raise Exception(f'Unsupported geometry transformation: {transform_geometry}') - - query = 'SELECT ' + ', '.join(columns) + ' FROM ' + escape_identifier(lyr_src.GetName()) + ' m' - - # add WHERE clauses and spatial filter; for GPKG/SQlite3, the query - # is too complex and the driver can't use ExecuteSQL()'s spatialFilter - # option so we add the ST_Intersects() condition - if drv.ShortName == 'PostgreSQL': - spatialFilter = extent_src - else: - spatialFilter = None - - cond = layerdef.get('where', None) - if (extent_src is not None and spatialFilter is None) or cond is not None: - query += ' WHERE ' - if extent_src is not None and spatialFilter is None: - query += 'ST_Intersects(m.' + geomFieldName_esc + ', \'' + extent_src.ExportToWkt() + '\')' - if cond is not None: - query += ' AND ' - if cond is not None: - query += '(' + cond.strip() + ')' - - logging.debug('SQL query: %s', query) - lyr_src = ds_src.ExecuteSQL(query, spatialFilter=spatialFilter) - - count1 = -1 - if lyr_src.TestCapability(ogr.OLCFastFeatureCount): - count1 = lyr_src.GetFeatureCount(force=0) - if count0 >= 0 and count1 >= 0: - logging.debug('Source layer "%s" has %d features, of which %d are to be exported', - layername, count0, count1) - return lyr_src - -# Export a layer from the source dataset, returning the next FID availble. -# TODO bump metadata_maxsize util the compressed version doesn't exceed 64k -def exportLayer(lyr_dst, lyr_src, srs=None, clip=None, layername=None, - coordinate_precision=None, nextfid=0, - # rotate after the feature which bring the file to the 256kiB mark - metadata_maxsize=256*1024, metadata=None): - if srs is None: - raise Exception('Missing target SRS') - if clip is not None: - clip_srs = clip.GetSpatialReference() - if not clip_srs.IsSame(srs): - raise Exception('Clipping geometry {} has SRS {},\nexpected {}'.format( - clip.ExportToWkt(), clip_srs.ExportToPrettyWkt(), srs.ExportToPrettyWkt())) - if metadata is None: - metadata_fp = None - else: - metadata_fp = metadata.get('fp', None) - exportToJson_options = [] - if coordinate_precision is not None: - exportToJson_options = ['COORDINATE_PRECISION=' + str(coordinate_precision)] - metadata_size = metadata.get('cursize', 0) - - srs_src = lyr_src.GetLayerDefn().GetGeomFieldDefn(0).GetSpatialRef() - if srs.IsSame(srs_src): - logging.debug('Both source and destination have the same SRS (%s), skipping coordinate transformation', - srs.GetName()) - ct = None - else: - # TODO Use GetSupportedSRSList() and SetActiveSRS() with GDAL ≥3.7.0 - # when possible, see apps/ogr2ogr_lib.cpp - logging.debug('Creating transforming from source SRS (%s) to destination SRS (%s)', - srs_src.GetName(), srs.GetName()) - ct = osr.CoordinateTransformation(srs_src, srs) - if ct is None: - raise Exception(f'Could not create transformation from source SRS ({srs_src.GetName()}) ' - + f'to destination SRS ({srs.GetName()})') - - # extract metadata index and filename from nextfid (an exception is - # raised if nextfid doesn't fit in 4 bytes) - bstr = nextfid.to_bytes(4, byteorder='big') - nextfid_idx = int.from_bytes(bstr[-2:], byteorder='big') - nextfid_p = int.from_bytes(bstr[:2], byteorder='big') - metadata_maxlen = 1 << (8*2) - - defn_dst = lyr_dst.GetLayerDefn() - - # we roll our own loop rather than using ogr2ogr/GDALVectorTranslate() - # since we want to do a custom JSON serialization at the same time, - # in order to avoid duplicating properties across tiles or clipping - # overlays at tile boundaries - - count = 0 - feature = lyr_src.GetNextFeature() - while feature is not None: - geom = feature.GetGeometryRef().Clone() - if ct is not None and geom.Transform(ct) != ogr.OGRERR_NONE: - raise Exception('Could not apply coordinate transformation') - geom.FlattenTo2D() - - if metadata is not None: - # serialize the feature to JSON and save it - # TODO there is a lot of room for improvement here - # - geometries could be exported to WKB or FlatGeoBuf rather than GeoJSON - # - columns (schema) could be saved in layers.json only, so - # the object would be an array of length n+1 where the - # geometry would be at index 0 or n - # - data could be serialized to binary, cf. BJSON or GeoBuf - feature.SetGeometry(geom) - obj = feature.ExportToJson(as_object=True, options=exportToJson_options) - #props = { key:value for key, value in obj['properties'].items() if value is not None } - props = obj['properties'] - props['geom'] = obj['geometry'] - - bstr = json.dumps(props, ensure_ascii=False, separators=(',',':')).encode('utf-8') - - #if nextfid_idx >= metadata_maxlen or nextfid_p * metadata_maxlen + nextfid_idx != nextfid: - # raise Exception(f'impossible: {nextfid_p}*{metadata_maxlen} + {nextfid_idx} != {nextfid}') - - if metadata_fp is None: - metadata_fp = openNewMetadaFile(metadata, nextfid_p) - metadata_size = 0 - else: - metadata_fp.write(b',') - - metadata_size += 1 + metadata_fp.write(bstr) - - # clip the feature and export it (without fields to MVT) - geom2 = geom if clip is None else geom.Intersection(clip) - feature2 = ogr.Feature(defn_dst) - feature2.SetGeometryDirectly(geom2) - feature2.SetFID(nextfid) - - if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE: - raise Exception(f'Could not transfer source feature #{feature.GetFID()}') - - nextfid += 1 - if metadata is not None: - nextfid_idx += 1 - if metadata_size > metadata_maxsize or nextfid_idx >= metadata_maxlen: - closeMetadataFile(metadata) - metadata_fp = metadata_size = None - nextfid_p += 1 - nextfid_idx = 0 - nextfid = nextfid_p * metadata_maxlen - # will open a new while when processiong the next feature - - count += 1 - feature = lyr_src.GetNextFeature() - - logging.info('Exported %d features to layer "%s" from "%s"', count, - lyr_dst.GetName(), layername) - - if metadata is not None: - # save size of the metadata file currently open for proper accounting - metadata['cursize'] = 0 if metadata_fp is None else metadata_size - return nextfid, count - -# Open a new metadata file. -def openNewMetadaFile(metadata, nextfid_p): - name = nextfid_p.to_bytes(length=2, byteorder='big').hex() - name = Path(metadata['tempdir']).joinpath(name).with_suffix('.json') - logging.debug('open(%s)', str(name)) - - fd = os.open(name, O_WRONLY|O_CREAT|O_EXCL|O_CLOEXEC, mode=0o0644, - dir_fd=metadata.get('dirfd', None)) - metadata['fp'] = fp = os.fdopen(fd, mode='wb', closefd=True) - fp.write(b'[') - return fp - -# Close a metadata file. -def closeMetadataFile(metadata): - fp = metadata.pop('fp') - fp.write(b']') - logging.debug('close(%d)', fp.fileno()) - fp.close() - -# Wrapper around os.write() that loops until the entire data is written. -def write_all(fd, buf): - offset = 0 - while offset < len(buf): - try: - n = os.write(fd, buf[offset:]) - except IOError as e: - if e.errno != errno.EAGAIN: - raise e - #logging.debug('Got EAGAIN while trying to write') - else: - #logging.debug('Wrote %d bytes of %d', n, len(buf) - offset) - offset += n - return offset - -# Compress a file to Brotli. -# TODO increase chunksize to see if helps with performances -def compress_brotli(path, suffix='.br', dir_fd=None, chunksize=65536, - oFlags=0, **kwargs): - compressor = brotli.Compressor(**kwargs) - fd_in = os.open(path, O_RDONLY|O_CLOEXEC, dir_fd=dir_fd) - size_in = size_out = 0 - try: - # hard-code the mode to save an fstat(2) call - fd_out = os.open(path + suffix, O_WRONLY|O_CREAT|O_CLOEXEC|oFlags, - mode=0o0644, dir_fd=dir_fd) - try: - buf_in = os.read(fd_in, chunksize) - while len(buf_in) > 0: - #logging.debug('Read %d bytes of %d', len(buf_in), chunksize) - size_in += len(buf_in) - size_out += write_all(fd_out, compressor.process(buf_in)) - buf_in = os.read(fd_in, chunksize) - size_out += write_all(fd_out, compressor.finish()) - finally: - os.close(fd_out) - finally: - os.close(fd_in) - return size_in, size_out - - -if __name__ == '__main__': - common.init_logger(app=os.path.basename(__file__), level=logging.INFO) - - parser = argparse.ArgumentParser(description='Export GIS layers to Mapbox Vector Tiles (MVT).') - parser.add_argument('--debug', action='count', default=0, - help=argparse.SUPPRESS) - parser.add_argument('--destdir', default=os.curdir, - help=f'destination directory for generated files (default: {os.curdir})') - parser.add_argument('--lockfile', default=None, - help=f'obtain an exclusive lock while exporting features to MVT') - parser.add_argument('--name', default=None, - help=f'name to use in the layer map (default: last component of DESTDIR)') - parser.add_argument('--webroot', default=None, - help=f'path to the web root') - parser.add_argument('--metadata', default=None, - help=f'path to the JSON metadata file (default: DESTDIR/../metadata.json)') - parser.add_argument('--metadata-lockfile', default=None, - help=f'path to the JSON metadata lock file (default: DESTDIR/../.metadata.json.lck)') - parser.add_argument('--compress', action=argparse.BooleanOptionalAction, default=False, - help='whether to compress tiles (default: False)') - parser.add_argument('groupname', nargs='*', help='group layer name(s) to process') - args = parser.parse_args() - - if args.debug > 0: - logging.getLogger().setLevel(logging.DEBUG) - if args.debug > 1: - from http.client import HTTPConnection - HTTPConnection.debuglevel = 1 - requests_log = logging.getLogger('urllib3') - requests_log.setLevel(logging.DEBUG) - requests_log.propagate = True - - if args.debug > 0: - logging.getLogger().setLevel(logging.DEBUG) - if args.debug > 1: - gdal.ConfigurePythonLogging(enable_debug=True) - - if args.compress: - # fail early if the module is missing - import brotli - - destdirname = args.name if args.name is not None else Path(args.destdir).name - if not destdirname: - logging.error('Could not infer --name value') - exit(1) - - config = common.get_config(groupnames=None if args.groupname == [] else args.groupname) - - # validate configuration - if 'dataset' not in config: - raise Exception('Configuration does not specify source dataset') - - export_layers = {} - mvtconf = {} - for layername, layerdef in config.get('layers', {}).items(): - exportdef = layerdef.get('publish', None) - if exportdef is None: - raise Exception(f'Layer "{layername}" has no publication definition') - if isinstance(exportdef, str): - exportdef = { exportdef:{} } - elif isinstance(exportdef, list): - exportdef = { l:{} for l in exportdef } - for export_layername, export_layerdef in exportdef.items(): - if export_layername in export_layers: - raise Exception(f'Duplicate definition for {export_layername}') - x = {} - for k in ('target_name', 'minzoom', 'maxzoom'): - if k in export_layerdef: - x[k] = export_layerdef[k] - mvtconf[export_layername] = x - export_layers[export_layername] = export_layerdef - export_layerdef['__layername-src__'] = layername - - if args.destdir != os.curdir: - try: - os.mkdir(args.destdir, mode=0o0755) - except FileExistsError: - pass - - # open source DS - ds = openSourceDS(config['dataset']) - - # get configured Spatial Reference System and extent; force traditional GIS order - # on the target SRS since it's what MVT and JSON export are expecting - srs = common.getSRS(osr, config.get('SRS', None)) - srs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER) - extent, extent_simple = common.getExtent(config.get('extent', None), srs=srs) - - if srs.IsProjected(): - # will export with centimeter precision - linearUnits = srs.GetLinearUnits() - coordinate_precision = math.ceil(math.log10(linearUnits) + 2) - if coordinate_precision < 0: - coordinate_precision = 0 - logging.debug('Coordinate precision: %d', coordinate_precision) - - if args.lockfile is not None: - lock_fd = os.open(args.lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644) - logging.debug('flock("%s", LOCK_EX)', args.lockfile) - flock(lock_fd, LOCK_EX) - - # always lock destdir (until the program exits) to avoid concurrent - # generations of the same tileset - destdir_fd = os.open(args.destdir, O_RDONLY|O_CLOEXEC|O_DIRECTORY) - logging.debug('flock("%s", LOCK_EX)', args.destdir) - flock(destdir_fd, LOCK_EX) - - nFeatures = 0 - tile_extension = '.pbf' - basedir = Path(f'/proc/self/fd/{destdir_fd}') - with tempfile.TemporaryDirectory() as tempdir: - # use a temporary directory for TEMPORARY_DB - logging.debug('Using "%s" as temporary directory', tempdir) - - ds_dst, dirname = createMVT(basedir, options={ - 'FORMAT': 'DIRECTORY', - # OpenLayers doesn't support compressed tiles, so instead - # we use brotli_static to let the browser transparently - # uncompress the responses - 'COMPRESS': 'NO', - 'TYPE': 'overlay', - 'BUFFER': 32, - 'TILE_EXTENSION': tile_extension.removeprefix('.'), - 'TEMPORARY_DB': Path(tempdir).joinpath('tmp-mvt.db'), - 'CONF': json.dumps(mvtconf, ensure_ascii=False, separators=(',',':')), - }) - - # use a temporary directory for the metadata to avoid messing - # around with he MVT directory while building is in progress - metadata = { 'dirfd': destdir_fd } - metadata['tempdir'] = Path(tempfile.mkdtemp(prefix='.' + dirname + '-attr.', - dir=basedir)).relative_to(basedir) - try: - logging.debug('Using "%s" as temporary metadata directory', metadata['tempdir']) - os.chmod(metadata['tempdir'], 0o0755, dir_fd=metadata.get('dirfd', None), - follow_symlinks=False) - nextfid = 0 - for layername, layerdef in export_layers.items(): - # create destination layer - lyr_dst = ds_dst.CreateLayer(layername, srs=srs, geom_type=ogr.wkbUnknown) - if lyr_dst is None: - raise Exception(f'Could not create destination layer "{layername}"') - - lyr_src = getSourceLayer(ds, layerdef, extent=extent) - try: - # use the non-dense geometry (a rectangle) for clipping as it's faster - # and there is no need to reproject (the SRSs match) - nextfid, count0 = exportLayer(lyr_dst, lyr_src, - layername=layerdef['__layername-src__'], - srs=srs, clip=extent_simple, - coordinate_precision=coordinate_precision, - nextfid=nextfid, metadata=metadata) - nFeatures += count0 - finally: - ds.ReleaseResultSet(lyr_src) - lyr_dst = None - - # close datasets - ds = None - ds_dst = None - - # move metadata to the MVT structure (now that it has been closed) - logging.debug('rename(%s/%s, %s/%s/attr)', - args.destdir, metadata['tempdir'], - args.destdir, dirname) - os.rename(metadata['tempdir'], - Path(dirname).joinpath('attr'), - src_dir_fd=metadata.get('dirfd', None), - dst_dir_fd=destdir_fd) - metadata.pop('tempdir', None) - - except Exception as e: - # cleanup - for root, dirnames, filenames, rootfd in os.fwalk(top=metadata['tempdir'], - dir_fd=metadata.get('dirfd', None), topdown=False, - follow_symlinks=False): - for filename in filenames: - #logging.debug('unlink(%s/%s)', root, filename) - os.unlink(filename, dir_fd=rootfd) - for dirname in dirnames: - #logging.debug('rmdir(%s/%s)', root, dirname) - os.rmdir(dirname, dir_fd=rootfd) - #logging.debug('rmdir(%s/%s)', args.destdir, metadata['tempdir']) - os.rmdir(metadata['tempdir'], dir_fd=metadata.get('dirfd', None)) - raise e - - finally: - if 'fp' in metadata: - closeMetadataFile(metadata) - - srs = None - extent = None - extent_simple = None - - try: - # OpenLayers doesn't make use of that file so delete it - os.unlink(Path(dirname).joinpath('metadata.json'), dir_fd=destdir_fd) - except FileNotFoundError: - pass - - logging.info('Done creating tiles and attribute files (total %d features)', nFeatures) - - if args.lockfile is not None: - # release the shared lock, the remaining shouldn't be as demanding - logging.debug('flock("%s", LOCK_UN)', args.lockfile) - flock(lock_fd, LOCK_UN) - - - stats_tiles = [] - stats_metadata = [] - if args.compress: - compress_mode = brotli.MODE_GENERIC - - # collect stats and optionally compress - # TODO: optimize traversal, cf. - # https://r3n.hashnode.dev/tech-chronicles-conquer-memory-monsters-with-pythons-yield-in-large-directory-processing - # TODO: benchmark brotli quality to trade-off compression ratio vs. execution time - # (also, tile and attribute files can have different levels if needs be) - for root, _, filenames, rootfd in os.fwalk(top=dirname, dir_fd=destdir_fd, - follow_symlinks=False): - for filename in filenames: - lfilename = filename.lower() - if lfilename.endswith(tile_extension): - arr = stats_tiles - elif lfilename.endswith('.json'): - arr = stats_metadata - else: - continue - - if not args.compress: - st = os.stat(filename, dir_fd=rootfd) - arr.append(st.st_size) - else: - #logging.debug("Compressing %s/%s", root, filename) - szi, szo = compress_brotli(filename, dir_fd=rootfd, - oFlags=O_EXCL, mode=compress_mode, quality=5) - arr.append((szi, szo)) - - # display stats - if len(stats_tiles) > 0: - stats_tiles = numpy.array(stats_tiles, dtype=numpy.int64, copy=False) - - arr = stats_tiles[:,0] if args.compress else stats_tiles - tot = numpy.sum(arr) - logging.info('Tile count: %d [min=%s, max=%s, sum=%s, avg=%s]', - arr.size, - format_bytes(numpy.min(arr)), format_bytes(numpy.max(arr)), - format_bytes(tot), format_bytes(round(tot/arr.size))) - - if args.compress: - arr = stats_tiles[:,1] - ztot = numpy.sum(arr) - logging.info('After compression: min=%s, max=%s, sum=%s, avg=%s (ratio %.1f%%)', - format_bytes(numpy.min(arr)), format_bytes(numpy.max(arr)), - format_bytes(ztot), format_bytes(round(ztot/arr.size)), - 100. * (1. - ztot/tot)) - - if len(stats_metadata) > 0: - stats_metadata = numpy.array(stats_metadata, dtype=numpy.int64, copy=False) - - arr = stats_metadata[:,0] if args.compress else stats_metadata - tot = numpy.sum(arr) - logging.info('Metadata file count: %d [min=%s, max=%s, sum=%s, avg=%s]', - arr.size, - format_bytes(numpy.min(arr)), format_bytes(numpy.max(arr)), - format_bytes(tot), format_bytes(round(tot/arr.size))) - - if args.compress: - arr = stats_metadata[:,1] - ztot = numpy.sum(arr) - logging.info('After compression: min=%s, max=%s, sum=%s, avg=%s (ratio %.1f%%)', - format_bytes(numpy.min(arr)), format_bytes(numpy.max(arr)), - format_bytes(ztot), format_bytes(round(ztot/arr.size)), - 100. * (1. - ztot/tot)) - - - # update metadata.json - if args.metadata is None: - mapfile = Path(args.destdir).parent.joinpath('metadata.json') - else: - mapfile = Path(args.metadata) - - # first, place an exclusive lock (it is important that all - # invocations of the program with the same --metadata file, even on - # different layer groups, share the same lock file to avoid - # concurrent updates of the medata.json) - if args.metadata_lockfile is None: - mapfile_lock = mapfile.with_name('.' + mapfile.name + '.lck') - else: - mapfile_lock = Path(args.metadata_lockfile) - mapfile_lock_fd = os.open(mapfile_lock, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644) - logging.debug('flock("%s", LOCK_EX)', mapfile_lock) - flock(mapfile_lock_fd, LOCK_EX) - - # load the file - try: - with open(mapfile, mode='rb') as fp: - mapobj = json.load(fp) - except FileNotFoundError: - mapobj = {} - - # remove the compressed version as we are prepare to update the - # uncompressed one - mapfile_compressed = mapfile.with_name(mapfile.name + '.br') - try: - logging.debug('unlink(%s)', mapfile_compressed) - os.unlink(mapfile_compressed) - except FileNotFoundError: - pass - - k = 'layermap' - if k not in mapobj: - layermap = mapobj[k] = {} - else: - layermap = mapobj[k] - destpath = Path(args.destdir) - if args.webroot is not None: - destpath = destpath.relative_to(args.webroot) - destpath = '/' + destpath.as_posix().removeprefix('/') + '/' + dirname - layermap[destdirname] = destpath - - # dump the new object to the temporary file - bstr = json.dumps(mapobj, ensure_ascii=False, separators=(',',':')).encode('utf-8') - mapfile_tmp = mapfile.with_stem('.' + mapfile.stem + '.new') - fd = os.open(mapfile_tmp, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644) - with os.fdopen(fd, mode='wb', closefd=True) as fp: - fp.write(bstr) - - # compress the temporary file - if args.compress: - mapfile_tmp_compressed = mapfile_tmp.with_name(mapfile_tmp.name + '.br') - compressor = brotli.Compressor() - fd = os.open(mapfile_tmp_compressed, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644) - try: - write_all(fd, compressor.process(bstr)) - write_all(fd, compressor.finish()) - finally: - os.close(fd) - - # rename temporary files to production (starting with the uncompressed one) - logging.debug('rename(%s, %s)', mapfile_tmp, mapfile) - os.rename(mapfile_tmp, mapfile) - - if args.compress: - logging.debug('rename(%s, %s)', mapfile_tmp_compressed, mapfile_compressed) - os.rename(mapfile_tmp_compressed, mapfile_compressed) |