#!/usr/bin/python3 #---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (create MVT tiles) # Copyright © 2024-2025 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- # pylint: disable=invalid-name, missing-module-docstring, fixme from os import O_RDONLY, O_WRONLY, O_CREAT, O_EXCL, O_CLOEXEC, O_DIRECTORY, F_OK import os from errno import EAGAIN import json import logging from pathlib import Path import shutil import tempfile from typing import Any, Iterator, Optional from time import monotonic as time_monotonic import brotli from osgeo import gdal, ogr, osr from common import BadConfiguration, escape_identifier, format_bytes, format_time from common_gdal import getExtent, getSRS, getSpatialFilterFromGeometry from rename_exchange import rename_exchange def parseTilingScheme(scheme : list[Any]) -> tuple[osr.SpatialReference, ogr.Geometry|None]: """Get SRS end extent from MVT tiling scheme (crs, tile_origin_upper_left_x, tile_origin_upper_left_y, tile_dimension_zoom_0).""" if scheme is None: srs = osr.SpatialReference() # standard WebMercator tiling scheme srs.ImportFromEPSG(3857) return srs, None if not isinstance(scheme, list) or len(scheme) != 4: raise BadConfiguration(f'Invalid tiling scheme: {scheme}') srs = getSRS(scheme[0]) if not srs.IsProjected(): raise RuntimeError('SRS is not projected: ' + srs.ExportToPrettyWkt()) minX = minY = maxX = maxY = None for i in range(srs.GetAxesCount()): orientation = srs.GetAxisOrientation('PROJCS', i) if orientation == osr.OAO_North: assert minY is None and maxY is None maxY = scheme[2] minY = maxY - scheme[3] elif orientation == osr.OAO_South: assert minY is None and maxY is None minY = scheme[2] maxY = minY + scheme[3] elif orientation == osr.OAO_East: assert minX is None and maxX is None minX = scheme[1] maxX = minX + scheme[3] elif orientation == osr.OAO_West: assert minX is None and maxX is None maxX = scheme[1] minX = maxX - scheme[3] if minX is None or minY is None or maxX is None or maxY is None: raise RuntimeError('Unknown axes meaning for ' + srs.ExportToPrettyWkt()) extent = (minX, minY, maxX, maxY) logging.debug('Parsed tiling scheme: extent=%s, srs="%s"', str(extent), srs.GetName()) return srs, getExtent(extent, srs=srs) # pylint: disable-next=dangerous-default-value def createMVT(drv : gdal.Driver, path : str, options : dict[str,str|int|float|None] = {}, default_options : dict[str,Any]|None = None) -> gdal.Dataset: """Create and return the MVT file/directory (which must not exist).""" # merge options from the config file default_map = { 'min-zoom': 'MINZOOM', 'max-zoom': 'MAXZOOM', 'max-size': 'MAX_SIZE', 'max-features': 'MAX_FEATURES', 'simplification': 'SIMPLIFICATION', 'simplification-max-zoom': 'SIMPLIFICATION_MAX_ZOOM', 'tiling-scheme': 'TILING_SCHEME', 'extent': 'EXTENT', } if default_options is not None: for k, v in default_options.items(): if k not in default_map: logging.warning('No default map for key "%s", ignoring', k) continue if options is None: options = {} k = default_map[k] if k not in options: if isinstance(v, list): v = ','.join(map(str, v)) options[k] = v kwargs = { 'eType': gdal.GDT_Unknown } if options is not None: kwargs['options'] = opts = [] for k, v in options.items(): opts.append(k + '=' + str(v)) logging.debug('Create(%s, %s, eType=%s%s)', drv.ShortName, path, kwargs['eType'], ', options=' + str(kwargs['options']) if 'options' in kwargs else '') return drv.Create(path, 0, 0, 0, **kwargs) # pylint: disable-next=too-many-branches def exportSourceLayer(ds_src : gdal.Dataset, lyr_src : ogr.Layer, lyr_dst : ogr.Layer, layerdef : dict[str,Any], extent : ogr.Geometry|None = None) -> int: """Export a source layer.""" count0 = -1 if lyr_src.TestCapability(ogr.OLCFastFeatureCount): count0 = lyr_src.GetFeatureCount(force=0) layername = lyr_src.GetName() srs_src = lyr_src.GetSpatialRef() if srs_src is None: raise RuntimeError(f'Source layer "{layername}" has no SRS') srs_dst = lyr_dst.GetSpatialRef() if srs_dst is None: raise RuntimeError(f'Destination layer "{lyr_dst.GetName()}" has no SRS') if srs_dst.IsSame(srs_src): logging.debug('Source and destination have the same SRS (%s), ' 'skipping coordinate transformation', srs_dst.GetName()) ct = None else: # TODO Use GetSupportedSRSList() and SetActiveSRS() with GDAL ≥3.7.0 # when possible, see apps/ogr2ogr_lib.cpp # pylint: disable=duplicate-code logging.debug('Creating transforming from source SRS (%s) to destination SRS (%s)', srs_src.GetName(), srs_dst.GetName()) ct = osr.CoordinateTransformation(srs_src, srs_dst) if ct is None: raise RuntimeError('Could not create transformation from source SRS ' f'({srs_src.GetName()}) to destination SRS ({srs_dst.GetName()})') # get geometry field of the source layer defn = lyr_src.GetLayerDefn() geomFieldCount = defn.GetGeomFieldCount() if geomFieldCount != 1: logging.warning('Source layer "%s" has %d != 1 geometry fields', layername, geomFieldCount) if geomFieldCount == 0: return 0 geomField = defn.GetGeomFieldDefn(0) if extent is None: spatialFilter = None else: # transform extent to source SRS spatialFilter = getSpatialFilterFromGeometry(extent, srs_src) transform_geometry = layerdef.get('transform-geometry', None) columns = [ 'm.' + escape_identifier(lyr_src.GetFIDColumn()) ] geomFieldName_esc = escape_identifier(geomField.GetName()) if transform_geometry is None: columns.append('m.' + geomFieldName_esc) elif transform_geometry == 'centroid': columns.append('ST_Centroid(m.' + geomFieldName_esc + ') AS ' + geomFieldName_esc) else: raise BadConfiguration(f'Unsupported geometry transformation: {transform_geometry}') query = 'SELECT ' + ', '.join(columns) + ' FROM ' + escape_identifier(layername) + ' m' cond = layerdef.get('where', None) if cond is not None: query += ' WHERE ' + cond.strip() logging.debug('ExecuteSQL(%s%s)', query, '' if spatialFilter is None else ', spatialFilter=' + spatialFilter.ExportToWkt()) lyr_src = ds_src.ExecuteSQL(query, spatialFilter=spatialFilter) try: count1 = -1 if lyr_src.TestCapability(ogr.OLCFastFeatureCount): count1 = lyr_src.GetFeatureCount(force=0) if count0 >= 0 and count1 >= 0: logging.debug('Source layer "%s" has %d features, of which %d are to be exported', layername, count0, count1) feature_count = 0 defn_dst = lyr_dst.GetLayerDefn() feature = lyr_src.GetNextFeature() while feature is not None: geom = feature.GetGeometryRef().Clone() if ct is not None and geom.Transform(ct) != ogr.OGRERR_NONE: raise RuntimeError('Could not apply coordinate transformation') geom.FlattenTo2D() feature2 = ogr.Feature(defn_dst) feature2.SetGeometryDirectly(geom) feature2.SetFID(feature.GetFID()) if lyr_dst.CreateFeature(feature2) != ogr.OGRERR_NONE: raise RuntimeError(f'Could not transfer source feature #{feature.GetFID()}') feature_count += 1 feature = lyr_src.GetNextFeature() logging.info('Exported %d features to MVT layer "%s" from "%s"', feature_count, lyr_dst.GetName(), layername) finally: ds_src.ReleaseResultSet(lyr_src) lyr_src = None return feature_count def list_files(top : str, dir_fd : Optional[int] = None, follow_symlinks : bool = False, ext : Optional[str] = None) -> Iterator[tuple[str,int]]: """Generator of filenames and dir_fds.""" for _dirname, _, filenames, dir_fd2 in os.fwalk(top=top, dir_fd=dir_fd, follow_symlinks=follow_symlinks): for filename in filenames: if ext is None or filename.lower().endswith(ext): yield filename, dir_fd2 def write_all(fd : int, buf : bytes) -> int: """Wrapper around os.write() that loops until the entire data is written.""" offset = 0 while offset < len(buf): try: n = os.write(fd, buf[offset:]) except IOError as e: if e.errno != EAGAIN: raise e #logging.debug('Got EAGAIN while trying to write') else: #logging.debug('Wrote %d bytes of %d', n, len(buf) - offset) offset += n return offset def compress_brotli(path : str, suffix : str = '.br', dir_fd : Optional[int] = None, # TODO increase chunksize to see if helps with performance chunksize : int = 65536, oFlags : int = 0, **kwargs) -> tuple[int, int]: """Compress a file to Brotli, returning a pair of original and compressed size.""" compressor = brotli.Compressor(**kwargs) fd_in = os.open(path, O_RDONLY|O_CLOEXEC, dir_fd=dir_fd) size_in = size_out = 0 try: # hard-code the mode to save an fstat(2) call fd_out = os.open(path + suffix, O_WRONLY|O_CREAT|O_CLOEXEC|oFlags, mode=0o0644, dir_fd=dir_fd) try: buf_in = os.read(fd_in, chunksize) while len(buf_in) > 0: #logging.debug('Read %d bytes of %d', len(buf_in), chunksize) size_in += len(buf_in) size_out += write_all(fd_out, compressor.process(buf_in)) buf_in = os.read(fd_in, chunksize) size_out += write_all(fd_out, compressor.finish()) finally: os.close(fd_out) finally: os.close(fd_in) return size_in, size_out # pylint: disable-next=too-many-branches, too-many-statements def exportMVT(ds : gdal.Dataset, layers : dict[str,dict[str,Any]], dst : Path, drvname : str = 'MVT', default_options : dict[str,Any]|None = None, tile_extension : str = '.pbf', compress : bool = False) -> None: """Export some layers to MVT.""" drv = gdal.GetDriverByName(drvname) if drv is None: raise RuntimeError(f'Unknown GDAL driver for format "{drvname}"') srs, extent = parseTilingScheme(default_options.get('tiling-scheme', None)) export_layers = {} mvtconf = {} for layername, layerdef in layers.items(): exportdef = layerdef.get('publish', None) if exportdef is None: raise RuntimeError(f'Layer "{layername}" has no publication definition') if isinstance(exportdef, str): exportdef = { exportdef:{} } elif isinstance(exportdef, list): exportdef = { l:{} for l in exportdef } for export_layername, export_layerdef in exportdef.items(): if export_layername in export_layers: raise RuntimeError(f'Duplicate definition for {export_layername}') x = {} for k in ('target_name', 'minzoom', 'maxzoom'): if k in export_layerdef: x[k] = export_layerdef[k] mvtconf[export_layername] = x export_layers[export_layername] = (layername, export_layerdef) # use a sibling temporary directory to make sure we can atomically rename/exchange # directories tmpdir = tempfile.mkdtemp(prefix='.tmp.mvt-', dir=dst.parent) logging.debug('Using "%s" as temporary directory for MVT', tmpdir) mvtname = 'mvt' dbname = 'db' dir_fd = os.open(tmpdir, O_RDONLY|O_CLOEXEC|O_DIRECTORY) try: start = time_monotonic() os.mkdir(dbname, mode=0o700, dir_fd=dir_fd) basedir = Path(f'/proc/self/fd/{dir_fd}') dso = createMVT(drv, path=str(basedir.joinpath(mvtname)), default_options=default_options, options = { 'FORMAT': 'DIRECTORY', # OpenLayers doesn't support compressed tiles, so instead # we use brotli_static to let the browser transparently # uncompress the responses 'COMPRESS': 'NO', 'TYPE': 'overlay', 'BUFFER': 32, 'TILE_EXTENSION': tile_extension.removeprefix('.'), 'TEMPORARY_DB': str(basedir.joinpath(dbname).joinpath('tmp.db')), 'CONF': json.dumps(mvtconf, ensure_ascii=False, separators=(',',':')), }) layer_count = feature_count = 0 for layername, (layername_src, layerdef) in export_layers.items(): # create destination layer lyr_src = ds.GetLayerByName(layername_src) if lyr_src is None: raise RuntimeError(f'Source dataset has no layer named "{layername_src}"') transform_geometry = layerdef.get('transform-geometry', None) if transform_geometry is None: geom_type = ogr.GT_Flatten(lyr_src.GetGeomType()) elif transform_geometry == 'centroid': geom_type = ogr.wkbPoint else: raise BadConfiguration(f'Unsupported geometry transformation: {transform_geometry}') logging.debug('Creating destination MVT layer "%s" with SRS %s and type %s', layername, srs.GetName(), ogr.GeometryTypeToName(geom_type)) lyr_dst = dso.CreateLayer(layername, srs=srs, geom_type=geom_type) if lyr_dst is None: raise RuntimeError(f'Could not create destination layer "{layername}"') # TODO: GDAL exports features to a temporary SQLite database even though the source # is PostGIS hence is able to generate MVT with ST_AsMVT(). Letting PostGIS generate # tiles is likely to speed up things. feature_count += exportSourceLayer(ds, lyr_src, lyr_dst, layerdef, extent=extent) layer_count += 1 lyr_dst = None lyr_src = None dso = None # close MVT dataset start2 = time_monotonic() logging.info('Exported %d features to %d MVT layers in %s', feature_count, layer_count, format_time(start2 - start)) tile_count = size_tot = size_tot_z = 0 size_min = size_max = size_min_z = size_max_z = None for filename, dir_fd2 in list_files(top=mvtname, dir_fd=dir_fd, follow_symlinks=False, ext=tile_extension): tile_count += 1 if not compress: szi = os.stat(filename, dir_fd=dir_fd2).st_size else: # TODO: benchmark brotli quality to trade-off compression ratio vs. execution time szi, szo = compress_brotli(filename, dir_fd=dir_fd2, oFlags=O_EXCL, mode=brotli.MODE_GENERIC, quality=5) size_tot_z += szo if size_min_z is None or szo < size_min_z: size_min_z = szo if size_max_z is None or size_max_z < szo: size_max_z = szo size_tot += szi if size_min is None or szi < size_min: size_min = szi if size_max is None or size_max < szi: size_max = szi if tile_count > 0: logging.info('Tile count: %d [min=%s, max=%s, sum=%s, avg=%s]', tile_count, format_bytes(size_min), format_bytes(size_max), format_bytes(size_tot), format_bytes(round(size_tot/tile_count))) if compress: logging.info('Compression ratio: %.1f%% in %s [min=%s, max=%s, sum=%s, avg=%s]', 100. * (1. - size_tot_z/size_tot), format_time(time_monotonic() - start2), format_bytes(size_min_z), format_bytes(size_max_z), format_bytes(size_tot_z), format_bytes(round(size_tot_z/tile_count))) try: # OpenLayers doesn't make use of that file so delete it os.unlink(str(Path(mvtname).joinpath('metadata.json')), dir_fd=dir_fd) except FileNotFoundError: pass try: # atomically exchange paths rename_exchange(mvtname, dst, olddirfd=dir_fd) except FileNotFoundError: # dst doesn't exist, use normal os.rename() instead os.rename(mvtname, dst, src_dir_fd=dir_fd) finally: lyr_dst = None dso = None # close MVT dataset srs = None for p in (dbname, mvtname): if os.access(p, F_OK, dir_fd=dir_fd, follow_symlinks=False): logging.debug('rmtree("%s/%s")', tmpdir, p) shutil.rmtree(p, dir_fd=dir_fd) logging.debug('rmdir("%s")', tmpdir) os.rmdir(tmpdir) try: os.close(dir_fd) except (OSError, ValueError): logging.exception('Could not close directory')