#!/usr/bin/python3 #---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (create MVT tiles) # Copyright © 2024-2025 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- # pylint: disable=invalid-name, missing-module-docstring, fixme from os import O_RDONLY, O_WRONLY, O_CREAT, O_EXCL, O_CLOEXEC, O_DIRECTORY, F_OK import os from errno import EAGAIN from fcntl import flock, LOCK_EX import json import logging from pathlib import Path import shutil import tempfile from typing import Any, Iterator, Optional from time import monotonic as time_monotonic import brotli from osgeo import gdal, ogr, osr from common import BadConfiguration, escape_identifier, format_bytes, format_time from common_gdal import getExtent, getSRS, getSpatialFilterFromGeometry from rename_exchange import rename_exchange def parseTilingScheme(scheme : list[Any]) -> tuple[osr.SpatialReference, ogr.Geometry|None]: """Get SRS end extent from MVT tiling scheme (crs, tile_origin_upper_left_x, tile_origin_upper_left_y, tile_dimension_zoom_0).""" if scheme is None: srs = osr.SpatialReference() # standard WebMercator tiling scheme srs.ImportFromEPSG(3857) return srs, None if not isinstance(scheme, list) or len(scheme) != 4: raise BadConfiguration(f'Invalid tiling scheme: {scheme}') srs = getSRS(scheme[0]) if not srs.IsProjected(): raise RuntimeError('SRS is not projected: ' + srs.ExportToPrettyWkt()) minX = minY = maxX = maxY = None for i in range(srs.GetAxesCount()): orientation = srs.GetAxisOrientation('PROJCS', i) if orientation == osr.OAO_North: assert minY is None and maxY is None maxY = scheme[2] minY = maxY - scheme[3] elif orientation == osr.OAO_South: assert minY is None and maxY is None minY = scheme[2] maxY = minY + scheme[3] elif orientation == osr.OAO_East: assert minX is None and maxX is None minX = scheme[1] maxX = minX + scheme[3] elif orientation == osr.OAO_West: assert minX is None and maxX is None maxX = scheme[1] minX = maxX - scheme[3] if minX is None or minY is None or maxX is None or maxY is None: raise RuntimeError('Unknown axes meaning for ' + srs.ExportToPrettyWkt()) extent = (minX, minY, maxX, maxY) logging.debug('Parsed tiling scheme: extent=%s, srs="%s"', str(extent), srs.GetName()) return srs, getExtent(extent, srs=srs) # pylint: disable-next=dangerous-default-value def createMVT(drv : gdal.Driver, path : str, options : dict[str,str|int|float|None] = {}, default_options : dict[str,Any]|None = None) -> gdal.Dataset: """Create and return the MVT file/directory (which must not exist).""" # merge options from the config file default_map = { 'min-zoom': 'MINZOOM', 'max-zoom': 'MAXZOOM', 'max-size': 'MAX_SIZE', 'max-features': 'MAX_FEATURES', 'simplification': 'SIMPLIFICATION', 'simplification-max-zoom': 'SIMPLIFICATION_MAX_ZOOM', 'tiling-scheme': 'TILING_SCHEME', 'extent': 'EXTENT', } if default_options is not None: for k, v in default_options.items(): if k not in default_map: logging.warning('No default map for key "%s", ignoring', k) continue if options is None: options = {} k = default_map[k] if k not in options: if isinstance(v, list): v = ','.join(map(str, v)) options[k] = v kwargs = { 'eType': gdal.GDT_Unknown } if options is not None: kwargs['options'] = opts = [] for k, v in options.items(): opts.append(k + '=' + str(v)) logging.debug('Create(%s, %s, eType=%s%s)', drv.ShortName, path, kwargs['eType'], ', options=' + str(kwargs['options']) if 'options' in kwargs else '') return drv.Create(path, 0, 0, 0, **kwargs) # pylint: disable-next=too-many-branches def exportSourceLayer(ds_src : gdal.Dataset, lyr_src : ogr.Layer, lyr_dst : ogr.Layer, layerdef : dict[str,Any], extent : ogr.Geometry|None = None) -> int: """Export a source layer.""" count0 = -1 if lyr_src.TestCapability(ogr.OLCFastFeatureCount): count0 = lyr_src.GetFeatureCount(force=0) layername = lyr_src.GetName() srs_src = lyr_src.GetSpatialRef() if srs_src is None: raise RuntimeError(f'Source layer "{layername}" has no SRS') srs_dst = lyr_dst.GetSpatialRef() if srs_dst is None: raise RuntimeError(f'Destination layer "{lyr_dst.GetName()}" has no SRS') if srs_dst.IsSame(srs_src): logging.debug('Source and destination have the same SRS (%s), ' 'skipping coordinate transformation', srs_dst.GetName()) ct = None else: # TODO Use GetSupportedSRSList() and SetActiveSRS() with GDAL ≥3.7.0 # when possible, see apps/ogr2ogr_lib.cpp # pylint: disable=duplicate-code logging.debug('Creating transforming from source SRS (%s) to destination SRS (%s)', srs_src.GetName(), srs_dst.GetName()) ct = osr.CoordinateTransformation(srs_src, srs_dst) if ct is None: raise RuntimeError('Could not create transformation from source SRS ' f'({srs_src.GetName()}) to destination SRS ({srs_dst.GetName()})') # get geometry field of the source layer defn = lyr_src.GetLayerDefn() geomFieldCount = defn.GetGeomFieldCount() if geomFieldCount != 1: logging.warning('Source layer "%s" has %d != 1 geometry fields', layername, geomFieldCount) if geomFieldCount == 0: return 0 geomField = defn.GetGeomFieldDefn(0) if extent is None: spatialFilter = None clip = None else: # transform extent to source SRS spatialFilter = getSpatialFilterFromGeometry(extent, srs_src) # ensure clipping geometries remain simple (mere rectangles) clip = getSpatialFilterFromGeometry(extent, srs_dst, fail_if_srs_not_equivalent = True) transform_geometry = layerdef.get('transform-geometry', None) columns = [ 'm.' + escape_identifier(lyr_src.GetFIDColumn()) ] geomFieldName_esc = escape_identifier(geomField.GetName()) if transform_geometry is None: columns.append('m.' + geomFieldName_esc) elif transform_geometry == 'centroid': columns.append('ST_Centroid(m.' + geomFieldName_esc + ') AS ' + geomFieldName_esc) else: raise BadConfiguration(f'Unsupported geometry transformation: {transform_geometry}') query = 'SELECT ' + ', '.join(columns) + ' FROM ' + escape_identifier(layername) + ' m' cond = layerdef.get('where', None) if cond is not None: query += ' WHERE ' + cond.strip() logging.debug('ExecuteSQL(%s%s)', query, '' if spatialFilter is None else ', spatialFilter=' + spatialFilter.ExportToWkt()) lyr_src = ds_src.ExecuteSQL(query, spatialFilter=spatialFilter) try: count1 = -1 if lyr_src.TestCapability(ogr.OLCFastFeatureCount): count1 = lyr_src.GetFeatureCount(force=0) if count0 >= 0 and count1 >= 0: logging.debug('Source layer "%s" has %d features, of which %d are to be exported', layername, count0, count1) defn_dst = lyr_dst.GetLayerDefn() eGType_dst = defn_dst.GetGeomType() dGeomIsUnknown = ogr.GT_Flatten(eGType_dst) == ogr.wkbUnknown count = count_degenerates = 0 feature = lyr_src.GetNextFeature() while feature is not None: geom = feature.GetGeometryRef().Clone() if ct is not None and geom.Transform(ct) != ogr.OGRERR_NONE: raise RuntimeError('Could not apply coordinate transformation') geom.FlattenTo2D() if clip is not None: # GDAL generates tiles beyond the tiling scheme extents so we need to # clip manually dim = geom.GetDimension() geom = geom.Intersection(clip) if geom.GetDimension() != dim: # degenerate intersection, skip feature count_degenerates += 1 feature = lyr_src.GetNextFeature() continue if not dGeomIsUnknown: geom = ogr.ForceTo(geom, eGType_dst) feature2 = ogr.Feature(defn_dst) feature2.SetGeometryDirectly(geom) feature2.SetFID(feature.GetFID()) if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE: raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}') count += 1 feature = lyr_src.GetNextFeature() logging.info('Exported %d features (%d degenerate skipped) to MVT layer "%s" from "%s"', count, count_degenerates, lyr_dst.GetName(), layername) finally: ds_src.ReleaseResultSet(lyr_src) lyr_src = None return count def list_files(top : str, dir_fd : Optional[int] = None, follow_symlinks : bool = False, ext : Optional[str] = None) -> Iterator[tuple[str,int]]: """Generator of filenames and dir_fds.""" for _dirname, _, filenames, dir_fd2 in os.fwalk(top=top, dir_fd=dir_fd, follow_symlinks=follow_symlinks): for filename in filenames: if ext is None or filename.lower().endswith(ext): yield filename, dir_fd2 def write_all(fd : int, buf : bytes) -> int: """Wrapper around os.write() that loops until the entire data is written.""" offset = 0 while offset < len(buf): try: n = os.write(fd, buf[offset:]) except IOError as e: if e.errno != EAGAIN: raise e #logging.debug('Got EAGAIN while trying to write') else: #logging.debug('Wrote %d bytes of %d', n, len(buf) - offset) offset += n return offset def compress_brotli(path : str, suffix : str = '.br', dir_fd : Optional[int] = None, # TODO increase chunksize to see if helps with performance chunksize : int = 65536, oFlags : int = 0, **kwargs) -> tuple[int, int]: """Compress a file to Brotli, returning a pair of original and compressed size.""" compressor = brotli.Compressor(**kwargs) fd_in = os.open(path, O_RDONLY|O_CLOEXEC, dir_fd=dir_fd) size_in = size_out = 0 try: # hard-code the mode to save an fstat(2) call fd_out = os.open(path + suffix, O_WRONLY|O_CREAT|O_CLOEXEC|oFlags, mode=0o0644, dir_fd=dir_fd) try: buf_in = os.read(fd_in, chunksize) while len(buf_in) > 0: #logging.debug('Read %d bytes of %d', len(buf_in), chunksize) size_in += len(buf_in) size_out += write_all(fd_out, compressor.process(buf_in)) buf_in = os.read(fd_in, chunksize) size_out += write_all(fd_out, compressor.finish()) finally: os.close(fd_out) finally: os.close(fd_in) return size_in, size_out # pylint: disable-next=too-many-branches, too-many-statements def exportMVT(ds : gdal.Dataset, layers : dict[str,dict[str,Any]], tmpdir : Path|None, dst : Path, mvtname : str = 'mvt', drvname : str = 'MVT', default_options : dict[str,Any]|None = None, tile_extension : str = '.pbf', compress : bool = False) -> None: """Export some layers to MVT.""" drv = gdal.GetDriverByName(drvname) if drv is None: raise RuntimeError(f'Unknown GDAL driver for format "{drvname}"') srs, extent = parseTilingScheme(default_options.get('tiling-scheme', None)) export_layers = {} mvtconf = {} for layername, layerdef in layers.items(): exportdef = layerdef.get('publish', None) if exportdef is None: raise RuntimeError(f'Layer "{layername}" has no publication definition') if isinstance(exportdef, str): exportdef = { exportdef:{} } elif isinstance(exportdef, list): exportdef = { l:{} for l in exportdef } for export_layername, export_layerdef in exportdef.items(): if export_layername in export_layers: raise RuntimeError(f'Duplicate definition for {export_layername}') x = {} for k in ('target_name', 'minzoom', 'maxzoom'): if k in export_layerdef: x[k] = export_layerdef[k] mvtconf[export_layername] = x export_layers[export_layername] = (layername, export_layerdef) if tmpdir is None: # use a temporary directory in the parent to make sure we can # atomically rename/exchange directories tmpdir2 = tempfile.mkdtemp(prefix='.tmp.mvt-', dir=str(dst.parent)) logging.debug('Using "%s" as temporary directory for MVT', tmpdir2) else: # assume it already exists (it's shared by all invocation of the program) tmpdir2 = str(tmpdir) dir_fd = os.open(tmpdir2, O_RDONLY|O_CLOEXEC|O_DIRECTORY) try: if tmpdir is not None: logging.debug('flock("%s", LOCK_EX)', tmpdir2) flock(dir_fd, LOCK_EX) with os.scandir(dir_fd) as it: if next(it, None) is not None: logging.warning('Temporary directory "%s" exists and is not empty', str(tmpdir)) # createMVT() crashes if mvtname exists so that's also what we want here tmpname = '.' + mvtname + '-tmp' os.mkdir(tmpname, mode=0o700, dir_fd=dir_fd) start = time_monotonic() basedir = Path(f'/proc/self/fd/{dir_fd}') # gdal.create() raises an exception when path exists, and that's what we want dso = createMVT(drv, path=str(basedir.joinpath(mvtname)), default_options=default_options, options = { 'FORMAT': 'DIRECTORY', # OpenLayers doesn't support compressed tiles, so instead # we use brotli_static to let the browser transparently # uncompress the responses 'COMPRESS': 'NO', 'TYPE': 'overlay', 'BUFFER': 32, 'TILE_EXTENSION': tile_extension.removeprefix('.'), 'TEMPORARY_DB': str(basedir.joinpath(tmpname).joinpath('tmp-mvt.db')), 'CONF': json.dumps(mvtconf, ensure_ascii=False, separators=(',',':')), }) layer_count = feature_count = 0 for layername, (layername_src, layerdef) in export_layers.items(): # create destination layer lyr_src = ds.GetLayerByName(layername_src) if lyr_src is None: raise RuntimeError(f'Source dataset has no layer named "{layername_src}"') transform_geometry = layerdef.get('transform-geometry', None) if transform_geometry is None: geom_type = ogr.GT_Flatten(lyr_src.GetGeomType()) elif transform_geometry == 'centroid': geom_type = ogr.wkbPoint else: raise BadConfiguration(f'Unsupported geometry transformation: {transform_geometry}') logging.debug('Creating destination MVT layer "%s" with SRS %s and type %s', layername, srs.GetName(), ogr.GeometryTypeToName(geom_type)) lyr_dst = dso.CreateLayer(layername, srs=srs, geom_type=geom_type) if lyr_dst is None: raise RuntimeError(f'Could not create destination layer "{layername}"') # TODO: GDAL exports features to a temporary SQLite database even though the source # is PostGIS hence is able to generate MVT with ST_AsMVT(). Letting PostGIS generate # tiles is likely to speed up things. feature_count += exportSourceLayer(ds, lyr_src, lyr_dst, layerdef, extent=extent) layer_count += 1 lyr_dst = None lyr_src = None dso = None start2 = time_monotonic() logging.info('Exported %d features to %d MVT layers in %s', feature_count, layer_count, format_time(start2 - start)) tile_count = size_tot = size_tot_z = 0 size_min = size_max = size_min_z = size_max_z = None for filename, dir_fd2 in list_files(top=mvtname, dir_fd=dir_fd, follow_symlinks=False, ext=tile_extension): tile_count += 1 if not compress: szi = os.stat(filename, dir_fd=dir_fd2).st_size else: # TODO: benchmark brotli quality to trade-off compression ratio vs. execution time szi, szo = compress_brotli(filename, dir_fd=dir_fd2, oFlags=O_EXCL, mode=brotli.MODE_GENERIC, quality=5) size_tot_z += szo if size_min_z is None or szo < size_min_z: size_min_z = szo if size_max_z is None or size_max_z < szo: size_max_z = szo size_tot += szi if size_min is None or szi < size_min: size_min = szi if size_max is None or size_max < szi: size_max = szi if tile_count > 0: logging.info('Tile count: %d [min=%s, max=%s, sum=%s, avg=%s]', tile_count, format_bytes(size_min), format_bytes(size_max), format_bytes(size_tot), format_bytes(round(size_tot/tile_count))) if compress: logging.info('Compression ratio: %.1f%% in %s [min=%s, max=%s, sum=%s, avg=%s]', 100. * (1. - size_tot_z/size_tot), format_time(time_monotonic() - start2), format_bytes(size_min_z), format_bytes(size_max_z), format_bytes(size_tot_z), format_bytes(round(size_tot_z/tile_count))) try: # OpenLayers doesn't make use of that file so delete it os.unlink(str(Path(mvtname).joinpath('metadata.json')), dir_fd=dir_fd) except FileNotFoundError: pass try: # atomically exchange paths rename_exchange(mvtname, dst, olddirfd=dir_fd) except FileNotFoundError: # dst doesn't exist, use normal os.rename() instead os.rename(mvtname, dst, src_dir_fd=dir_fd) finally: lyr_dst = None dso = None srs = None for p in (tmpname, mvtname): if os.access(p, F_OK, dir_fd=dir_fd, follow_symlinks=False): logging.debug('rmtree("%s/%s")', tmpdir2, p) shutil.rmtree(p, dir_fd=dir_fd) try: os.close(dir_fd) except (OSError, ValueError): logging.exception('Could not close directory') if tmpdir is None: logging.debug('rmdir("%s")', tmpdir2) os.rmdir(tmpdir2)