diff options
Diffstat (limited to 'export_mvt.py')
-rw-r--r-- | export_mvt.py | 474 |
1 files changed, 474 insertions, 0 deletions
diff --git a/export_mvt.py b/export_mvt.py new file mode 100644 index 0000000..e9b54f4 --- /dev/null +++ b/export_mvt.py @@ -0,0 +1,474 @@ +#!/usr/bin/python3 + +#---------------------------------------------------------------------- +# Backend utilities for the Klimatanalys Norr project (create MVT tiles) +# Copyright © 2024-2025 Guilhem Moulin <info@guilhem.se> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +#---------------------------------------------------------------------- + +# pylint: disable=invalid-name, missing-module-docstring, fixme + +from os import O_RDONLY, O_WRONLY, O_CREAT, O_EXCL, O_CLOEXEC, O_DIRECTORY, F_OK +import os +from errno import EAGAIN +from fcntl import flock, LOCK_EX +import json +import logging +from pathlib import Path +import shutil +import tempfile +from typing import Any, Iterator, Optional +from time import monotonic as time_monotonic + +import brotli +from osgeo import gdal, ogr, osr + +from common import BadConfiguration, escape_identifier, format_bytes, format_time +from common_gdal import getExtent, getSRS, getSpatialFilterFromGeometry +from rename_exchange import rename_exchange + +def parseTilingScheme(scheme : list[Any]) -> tuple[osr.SpatialReference, ogr.Geometry|None]: + """Get SRS end extent from MVT tiling scheme (crs, tile_origin_upper_left_x, + tile_origin_upper_left_y, tile_dimension_zoom_0).""" + if scheme is None: + srs = osr.SpatialReference() + # standard WebMercator tiling scheme + srs.ImportFromEPSG(3857) + return srs, None + if not isinstance(scheme, list) or len(scheme) != 4: + raise BadConfiguration(f'Invalid tiling scheme: {scheme}') + srs = getSRS(scheme[0]) + if not srs.IsProjected(): + raise RuntimeError('SRS is not projected: ' + srs.ExportToPrettyWkt()) + + minX = minY = maxX = maxY = None + for i in range(srs.GetAxesCount()): + orientation = srs.GetAxisOrientation('PROJCS', i) + if orientation == osr.OAO_North: + assert minY is None and maxY is None + maxY = scheme[2] + minY = maxY - scheme[3] + elif orientation == osr.OAO_South: + assert minY is None and maxY is None + minY = scheme[2] + maxY = minY + scheme[3] + elif orientation == osr.OAO_East: + assert minX is None and maxX is None + minX = scheme[1] + maxX = minX + scheme[3] + elif orientation == osr.OAO_West: + assert minX is None and maxX is None + maxX = scheme[1] + minX = maxX - scheme[3] + if minX is None or minY is None or maxX is None or maxY is None: + raise RuntimeError('Unknown axes meaning for ' + srs.ExportToPrettyWkt()) + extent = (minX, minY, maxX, maxY) + logging.debug('Parsed tiling scheme: extent=%s, srs="%s"', str(extent), srs.GetName()) + return srs, getExtent(extent, srs=srs) + +# pylint: disable-next=dangerous-default-value +def createMVT(drv : gdal.Driver, path : str, + options : dict[str,str|int|float|None] = {}, + default_options : dict[str,Any]|None = None) -> gdal.Dataset: + """Create and return the MVT file/directory (which must not exist).""" + # merge options from the config file + default_map = { + 'min-zoom': 'MINZOOM', + 'max-zoom': 'MAXZOOM', + 'max-size': 'MAX_SIZE', + 'max-features': 'MAX_FEATURES', + 'simplification': 'SIMPLIFICATION', + 'simplification-max-zoom': 'SIMPLIFICATION_MAX_ZOOM', + 'tiling-scheme': 'TILING_SCHEME', + 'extent': 'EXTENT', + } + if default_options is not None: + for k, v in default_options.items(): + if k not in default_map: + logging.warning('No default map for key "%s", ignoring', k) + continue + if options is None: + options = {} + k = default_map[k] + if k not in options: + if isinstance(v, list): + v = ','.join(map(str, v)) + options[k] = v + + kwargs = { 'eType': gdal.GDT_Unknown } + if options is not None: + kwargs['options'] = opts = [] + for k, v in options.items(): + opts.append(k + '=' + str(v)) + + logging.debug('Create(%s, %s, eType=%s%s)', drv.ShortName, path, kwargs['eType'], + ', options=' + str(kwargs['options']) if 'options' in kwargs else '') + return drv.Create(path, 0, 0, 0, **kwargs) + +# pylint: disable-next=too-many-branches +def exportSourceLayer(ds_src : gdal.Dataset, + lyr_src : ogr.Layer, + lyr_dst : ogr.Layer, + layerdef : dict[str,Any], + extent : ogr.Geometry|None = None) -> int: + """Export a source layer.""" + count0 = -1 + if lyr_src.TestCapability(ogr.OLCFastFeatureCount): + count0 = lyr_src.GetFeatureCount(force=0) + + layername = lyr_src.GetName() + srs_src = lyr_src.GetSpatialRef() + if srs_src is None: + raise RuntimeError(f'Source layer "{layername}" has no SRS') + srs_dst = lyr_dst.GetSpatialRef() + if srs_dst is None: + raise RuntimeError(f'Destination layer "{lyr_dst.GetName()}" has no SRS') + + if srs_dst.IsSame(srs_src): + logging.debug('Source and destination have the same SRS (%s), ' + 'skipping coordinate transformation', + srs_dst.GetName()) + ct = None + else: + # TODO Use GetSupportedSRSList() and SetActiveSRS() with GDAL ≥3.7.0 + # when possible, see apps/ogr2ogr_lib.cpp + # pylint: disable=duplicate-code + logging.debug('Creating transforming from source SRS (%s) to destination SRS (%s)', + srs_src.GetName(), srs_dst.GetName()) + ct = osr.CoordinateTransformation(srs_src, srs_dst) + if ct is None: + raise RuntimeError('Could not create transformation from source SRS ' + f'({srs_src.GetName()}) to destination SRS ({srs_dst.GetName()})') + + # get geometry field of the source layer + defn = lyr_src.GetLayerDefn() + geomFieldCount = defn.GetGeomFieldCount() + if geomFieldCount != 1: + logging.warning('Source layer "%s" has %d != 1 geometry fields', + layername, geomFieldCount) + if geomFieldCount == 0: + return 0 + geomField = defn.GetGeomFieldDefn(0) + + if extent is None: + spatialFilter = None + clip = None + else: + # transform extent to source SRS + spatialFilter = getSpatialFilterFromGeometry(extent, srs_src) + # ensure clipping geometries remain simple (mere rectangles) + clip = getSpatialFilterFromGeometry(extent, srs_dst, fail_if_srs_not_equivalent = True) + + transform_geometry = layerdef.get('transform-geometry', None) + columns = [ 'm.' + escape_identifier(lyr_src.GetFIDColumn()) ] + geomFieldName_esc = escape_identifier(geomField.GetName()) + if transform_geometry is None: + columns.append('m.' + geomFieldName_esc) + elif transform_geometry == 'centroid': + columns.append('ST_Centroid(m.' + geomFieldName_esc + ') AS ' + geomFieldName_esc) + else: + raise BadConfiguration(f'Unsupported geometry transformation: {transform_geometry}') + + query = 'SELECT ' + ', '.join(columns) + ' FROM ' + escape_identifier(layername) + ' m' + + cond = layerdef.get('where', None) + if cond is not None: + query += ' WHERE ' + cond.strip() + + logging.debug('ExecuteSQL(%s%s)', query, + '' if spatialFilter is None else ', spatialFilter=' + spatialFilter.ExportToWkt()) + lyr_src = ds_src.ExecuteSQL(query, spatialFilter=spatialFilter) + try: + count1 = -1 + if lyr_src.TestCapability(ogr.OLCFastFeatureCount): + count1 = lyr_src.GetFeatureCount(force=0) + if count0 >= 0 and count1 >= 0: + logging.debug('Source layer "%s" has %d features, of which %d are to be exported', + layername, count0, count1) + + defn_dst = lyr_dst.GetLayerDefn() + eGType_dst = defn_dst.GetGeomType() + dGeomIsUnknown = ogr.GT_Flatten(eGType_dst) == ogr.wkbUnknown + + count = count_degenerates = 0 + feature = lyr_src.GetNextFeature() + while feature is not None: + geom = feature.GetGeometryRef().Clone() + if ct is not None and geom.Transform(ct) != ogr.OGRERR_NONE: + raise RuntimeError('Could not apply coordinate transformation') + geom.FlattenTo2D() + if clip is not None: + # GDAL generates tiles beyond the tiling scheme extents so we need to + # clip manually + dim = geom.GetDimension() + geom = geom.Intersection(clip) + if geom.GetDimension() != dim: + # degenerate intersection, skip feature + count_degenerates += 1 + feature = lyr_src.GetNextFeature() + continue + if not dGeomIsUnknown: + geom = ogr.ForceTo(geom, eGType_dst) + + feature2 = ogr.Feature(defn_dst) + feature2.SetGeometryDirectly(geom) + feature2.SetFID(feature.GetFID()) + if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE: + raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}') + count += 1 + feature = lyr_src.GetNextFeature() + + logging.info('Exported %d features (%d degenerate skipped) to MVT layer "%s" from "%s"', + count, count_degenerates, lyr_dst.GetName(), layername) + + finally: + ds_src.ReleaseResultSet(lyr_src) + lyr_src = None + return count + +def list_files(top : str, + dir_fd : Optional[int] = None, + follow_symlinks : bool = False, + ext : Optional[str] = None) -> Iterator[tuple[str,int]]: + """Generator of filenames and dir_fds.""" + for _dirname, _, filenames, dir_fd2 in os.fwalk(top=top, + dir_fd=dir_fd, + follow_symlinks=follow_symlinks): + for filename in filenames: + if ext is None or filename.lower().endswith(ext): + yield filename, dir_fd2 + +def write_all(fd : int, buf : bytes) -> int: + """Wrapper around os.write() that loops until the entire data is written.""" + offset = 0 + while offset < len(buf): + try: + n = os.write(fd, buf[offset:]) + except IOError as e: + if e.errno != EAGAIN: + raise e + #logging.debug('Got EAGAIN while trying to write') + else: + #logging.debug('Wrote %d bytes of %d', n, len(buf) - offset) + offset += n + return offset + +def compress_brotli(path : str, + suffix : str = '.br', + dir_fd : Optional[int] = None, + # TODO increase chunksize to see if helps with performance + chunksize : int = 65536, + oFlags : int = 0, + **kwargs) -> tuple[int, int]: + """Compress a file to Brotli, returning a pair of original and compressed size.""" + compressor = brotli.Compressor(**kwargs) + fd_in = os.open(path, O_RDONLY|O_CLOEXEC, dir_fd=dir_fd) + size_in = size_out = 0 + try: + # hard-code the mode to save an fstat(2) call + fd_out = os.open(path + suffix, O_WRONLY|O_CREAT|O_CLOEXEC|oFlags, + mode=0o0644, dir_fd=dir_fd) + try: + buf_in = os.read(fd_in, chunksize) + while len(buf_in) > 0: + #logging.debug('Read %d bytes of %d', len(buf_in), chunksize) + size_in += len(buf_in) + size_out += write_all(fd_out, compressor.process(buf_in)) + buf_in = os.read(fd_in, chunksize) + size_out += write_all(fd_out, compressor.finish()) + finally: + os.close(fd_out) + finally: + os.close(fd_in) + return size_in, size_out + +# pylint: disable-next=too-many-branches, too-many-statements +def exportMVT(ds : gdal.Dataset, + layers : dict[str,dict[str,Any]], + tmpdir : Path|None, + dst : Path, + mvtname : str = 'mvt', + drvname : str = 'MVT', + default_options : dict[str,Any]|None = None, + tile_extension : str = '.pbf', + compress : bool = False) -> None: + """Export some layers to MVT.""" + drv = gdal.GetDriverByName(drvname) + if drv is None: + raise RuntimeError(f'Unknown GDAL driver for format "{drvname}"') + + srs, extent = parseTilingScheme(default_options.get('tiling-scheme', None)) + + export_layers = {} + mvtconf = {} + for layername, layerdef in layers.items(): + exportdef = layerdef.get('publish', None) + if exportdef is None: + raise RuntimeError(f'Layer "{layername}" has no publication definition') + if isinstance(exportdef, str): + exportdef = { exportdef:{} } + elif isinstance(exportdef, list): + exportdef = { l:{} for l in exportdef } + for export_layername, export_layerdef in exportdef.items(): + if export_layername in export_layers: + raise RuntimeError(f'Duplicate definition for {export_layername}') + x = {} + for k in ('target_name', 'minzoom', 'maxzoom'): + if k in export_layerdef: + x[k] = export_layerdef[k] + mvtconf[export_layername] = x + export_layers[export_layername] = (layername, export_layerdef) + + if tmpdir is None: + # use a temporary directory in the parent to make sure we can + # atomically rename/exchange directories + tmpdir2 = tempfile.mkdtemp(prefix='.tmp.mvt-', dir=str(dst.parent)) + logging.debug('Using "%s" as temporary directory for MVT', tmpdir2) + else: + # assume it already exists (it's shared by all invocation of the program) + tmpdir2 = str(tmpdir) + + dir_fd = os.open(tmpdir2, O_RDONLY|O_CLOEXEC|O_DIRECTORY) + try: + if tmpdir is not None: + logging.debug('flock("%s", LOCK_EX)', tmpdir2) + flock(dir_fd, LOCK_EX) + with os.scandir(dir_fd) as it: + if next(it, None) is not None: + logging.warning('Temporary directory "%s" exists and is not empty', + str(tmpdir)) + + # createMVT() crashes if mvtname exists so that's also what we want here + tmpname = '.' + mvtname + '-tmp' + os.mkdir(tmpname, mode=0o700, dir_fd=dir_fd) + + start = time_monotonic() + basedir = Path(f'/proc/self/fd/{dir_fd}') + # gdal.create() raises an exception when path exists, and that's what we want + dso = createMVT(drv, path=str(basedir.joinpath(mvtname)), + default_options=default_options, + options = { + 'FORMAT': 'DIRECTORY', + # OpenLayers doesn't support compressed tiles, so instead + # we use brotli_static to let the browser transparently + # uncompress the responses + 'COMPRESS': 'NO', + 'TYPE': 'overlay', + 'BUFFER': 32, + 'TILE_EXTENSION': tile_extension.removeprefix('.'), + 'TEMPORARY_DB': str(basedir.joinpath(tmpname).joinpath('tmp-mvt.db')), + 'CONF': json.dumps(mvtconf, ensure_ascii=False, separators=(',',':')), + }) + + layer_count = feature_count = 0 + for layername, (layername_src, layerdef) in export_layers.items(): + # create destination layer + lyr_src = ds.GetLayerByName(layername_src) + if lyr_src is None: + raise RuntimeError(f'Source dataset has no layer named "{layername_src}"') + transform_geometry = layerdef.get('transform-geometry', None) + if transform_geometry is None: + geom_type = ogr.GT_Flatten(lyr_src.GetGeomType()) + elif transform_geometry == 'centroid': + geom_type = ogr.wkbPoint + else: + raise BadConfiguration(f'Unsupported geometry transformation: {transform_geometry}') + logging.debug('Creating destination MVT layer "%s" with SRS %s and type %s', + layername, srs.GetName(), ogr.GeometryTypeToName(geom_type)) + lyr_dst = dso.CreateLayer(layername, srs=srs, geom_type=geom_type) + if lyr_dst is None: + raise RuntimeError(f'Could not create destination layer "{layername}"') + + # TODO: GDAL exports features to a temporary SQLite database even though the source + # is PostGIS hence is able to generate MVT with ST_AsMVT(). Letting PostGIS generate + # tiles is likely to speed up things. + feature_count += exportSourceLayer(ds, lyr_src, lyr_dst, layerdef, extent=extent) + layer_count += 1 + lyr_dst = None + lyr_src = None + + dso = None + start2 = time_monotonic() + logging.info('Exported %d features to %d MVT layers in %s', + feature_count, layer_count, format_time(start2 - start)) + + tile_count = size_tot = size_tot_z = 0 + size_min = size_max = size_min_z = size_max_z = None + for filename, dir_fd2 in list_files(top=mvtname, dir_fd=dir_fd, + follow_symlinks=False, + ext=tile_extension): + tile_count += 1 + if not compress: + szi = os.stat(filename, dir_fd=dir_fd2).st_size + else: + # TODO: benchmark brotli quality to trade-off compression ratio vs. execution time + szi, szo = compress_brotli(filename, + dir_fd=dir_fd2, + oFlags=O_EXCL, + mode=brotli.MODE_GENERIC, + quality=5) + size_tot_z += szo + if size_min_z is None or szo < size_min_z: + size_min_z = szo + if size_max_z is None or size_max_z < szo: + size_max_z = szo + size_tot += szi + if size_min is None or szi < size_min: + size_min = szi + if size_max is None or size_max < szi: + size_max = szi + + if tile_count > 0: + logging.info('Tile count: %d [min=%s, max=%s, sum=%s, avg=%s]', + tile_count, + format_bytes(size_min), format_bytes(size_max), + format_bytes(size_tot), format_bytes(round(size_tot/tile_count))) + if compress: + logging.info('Compression ratio: %.1f%% in %s [min=%s, max=%s, sum=%s, avg=%s]', + 100. * (1. - size_tot_z/size_tot), + format_time(time_monotonic() - start2), + format_bytes(size_min_z), format_bytes(size_max_z), + format_bytes(size_tot_z), format_bytes(round(size_tot_z/tile_count))) + + try: + # OpenLayers doesn't make use of that file so delete it + os.unlink(str(Path(mvtname).joinpath('metadata.json')), dir_fd=dir_fd) + except FileNotFoundError: + pass + + try: + # atomically exchange paths + rename_exchange(mvtname, dst, olddirfd=dir_fd) + except FileNotFoundError: + # dst doesn't exist, use normal os.rename() instead + os.rename(mvtname, dst, src_dir_fd=dir_fd) + + finally: + lyr_dst = None + dso = None + srs = None + for p in (tmpname, mvtname): + if os.access(p, F_OK, dir_fd=dir_fd, follow_symlinks=False): + logging.debug('rmtree("%s/%s")', tmpdir2, p) + shutil.rmtree(p, dir_fd=dir_fd) + + try: + os.close(dir_fd) + except (OSError, ValueError): + logging.exception('Could not close directory') + + if tmpdir is None: + logging.debug('rmdir("%s")', tmpdir2) + os.rmdir(tmpdir2) |