#!/usr/bin/python3 #---------------------------------------------------------------------- # Backend utilities for the Klimatanalys Norr project (Cloud Optimized GeoTIFF generator) # Copyright © 2025 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- # pylint: disable=invalid-name, missing-module-docstring from os import O_RDONLY, O_CLOEXEC, O_DIRECTORY, F_OK import os import sys import logging from pathlib import Path from typing import Any import shutil import tempfile from time import time_ns from osgeo import gdal, ogr, osr from osgeo.gdalconst import ( OF_RASTER as GDAL_OF_RASTER, OF_READONLY as GDAL_OF_READONLY, OF_VERBOSE_ERROR as GDAL_OF_VERBOSE_ERROR, ) from common import BadConfiguration from import_source import importSource0 from common_gdal import ( gdalSetOpenExArgs, gdalVersionMin, getSpatialFilterFromGeometry, ) from export_mvt import exportMetadata, getLayerMetadata from rename_exchange import rename_exchange def processRaster(layername : str, layerdef : dict[str,Any], sources : dict[str,Any], license_info: dict[str,str|dict[str,str]], last_modified : dict[str,int], dst : Path, cachedir : Path|None = None, extent : ogr.Geometry|None = None, compress_metadata : bool = False) -> None: """Process a raster file.""" source = layerdef['sources'] assert layerdef['type'] == 'raster' if len(source) != 1: raise BadConfiguration(f'{layername} has {len(source)} != 1 sources') publish = layerdef.get('publish', None) if publish is None or len(publish) < 1: raise BadConfiguration(f'{layername} has no export definition') if not isinstance(publish, dict) or not all(isinstance(l, str) for l in publish.values()): raise BadConfiguration(f'{layername} has invalid export definition {publish}') source = source[0] if sys.stderr.isatty(): from tqdm import tqdm # pylint: disable=import-outside-toplevel progress = tqdm else: progress = None if len(last_modified) < 1: last_modified_ns = None else: last_modified_ns = max(last_modified.values()) * 1000000 try: st = os.stat(str(dst)) if last_modified_ns <= st.st_mtime_ns: logging.info('Output directory "%s" is up to date, skipping', str(dst)) return except (OSError, ValueError): #logging.warning('Could not stat(%s)', str(dst)) pass # use a sibling temporary directory to make sure we can atomically rename/exchange # directories tmpdir = tempfile.mkdtemp(prefix='.tmp.' + dst.name + '-', dir=dst.parent) logging.debug('Using "%s" as temporary directory for MVT', tmpdir) dir_fd = os.open(tmpdir, O_RDONLY|O_CLOEXEC|O_DIRECTORY) try: creation_time = time_ns() os.mkdir(dst.name, mode=0o755, dir_fd=dir_fd) source['import'] |= { '_progress': progress, '_dest': Path(f'/proc/self/fd/{dir_fd}').joinpath(dst.name).joinpath(dst.name + '.tiff') } importSource0(None, **source['source'], args=source['import'], cachedir=cachedir, extent=extent, callback=_processRaster2) exportMetadata(basedir=Path(dst.name), data=getLayerMetadata({str(i):layerdef | {'description':desc} for i,desc in publish.items()}, sources=sources, license_info=license_info, last_modified=last_modified, last_updated=creation_time // 1000000), dir_fd=dir_fd, # pylint: disable=duplicate-code compress=compress_metadata) if last_modified_ns is not None: os.utime(dst.name, ns=(last_modified_ns, last_modified_ns), dir_fd=dir_fd, follow_symlinks=False) try: # atomically exchange paths rename_exchange(dst.name, dst, olddirfd=dir_fd) except FileNotFoundError: # dst doesn't exist, use normal os.rename() instead os.rename(dst.name, dst, src_dir_fd=dir_fd) finally: if progress is not None and '_pbar' in source['import']: source['import'].pop('_pbar').close() if os.access(dst.name, F_OK, dir_fd=dir_fd, follow_symlinks=False): logging.debug('rmtree("%s/%s")', tmpdir, dst.name) shutil.rmtree(dst.name, dir_fd=dir_fd) logging.debug('rmdir("%s")', tmpdir) os.rmdir(tmpdir) try: os.close(dir_fd) # pylint: disable=duplicate-code except (OSError, ValueError): logging.exception('Could not close directory') def _processRaster2(_ : None, path : str, args : dict[str,Any], basedir : Path|None, extent : ogr.Geometry|None) -> gdal.Dataset: kwargs, _ = gdalSetOpenExArgs(args, flags=GDAL_OF_RASTER|GDAL_OF_READONLY|GDAL_OF_VERBOSE_ERROR) path2 = path if basedir is None else str(basedir.joinpath(path)) logging.debug('OpenEx(%s, %s)', path2, str(kwargs)) ds = gdal.OpenEx(path2, **kwargs) # pylint: disable=duplicate-code if ds is None: raise RuntimeError(f'Could not open {path2}') if ds.RasterCount != 1: raise NotImplementedError(f'Input raster {path2} has {ds.RasterCount} != 1 bands') rb = ds.GetRasterBand(1) gt = ds.GetGeoTransform() xs = ds.RasterXSize ys = ds.RasterYSize srs = ds.GetSpatialRef() srs.SetAxisMappingStrategy(gdal.osr.OAMS_TRADITIONAL_GIS_ORDER) # force x,y ulx, uly = gdal.ApplyGeoTransform(gt, 0, 0) lrx, lry = gdal.ApplyGeoTransform(gt, xs, ys) assert ulx <= lrx assert uly >= lry extent = getSpatialFilterFromGeometry(extent, srs) ct = osr.CoordinateTransformation(extent.GetSpatialReference(), srs) extent = extent.GetEnvelope() ulxy = ct.TransformPoint(extent[0], extent[3]) lrxy = ct.TransformPoint(extent[1], extent[2]) assert ulxy[0] <= lrxy[0] assert ulxy[1] >= lrxy[1] ulx2 = max(ulx, ulxy[0]) uly2 = min(uly, ulxy[1]) lrx2 = min(lrx, lrxy[0]) lry2 = max(lry, lrxy[1]) assert ulx2 < lrx2 assert lry2 < uly2 # don't care about overview here, GDAL will take the ceiling when sizing # (the source width is not even disible by 2) r = (lrx2 - ulx2) % abs(gt[1]) if r != 0: # extend X boundaries to preserve xres d = abs(gt[1]) - r if ulxy[0] < ulx: ulx2 -= d else: lrx2 += r assert (lrx2 - ulx2) % abs(gt[1]) == 0 r = (uly2 - lry2) % abs(gt[5]) if r != 0: # extend Y boundaries to preserve yres d = abs(gt[5]) - r if lrxy[1] < lry: uly2 += r else: lry2 -= d assert (uly2 - lry2) % abs(gt[5]) == 0 # see https://gdal.org/en/stable/drivers/raster/cog.html creationOptions = [ 'BLOCKSIZE=256', 'COMPRESS=LZW', 'RESAMPLING=NEAREST', 'OVERVIEWS=IGNORE_EXISTING', ] if (rb.GetColorInterpretation() in (gdal.GCI_PaletteIndex, gdal.GCI_GrayIndex) and rb.DataType == gdal.GDT_Byte): # 8-bit gray, assume a palette so don't interpolate creationOptions.append('RESAMPLING=NEAREST') if gdalVersionMin(maj=3, min=11): creationOptions.append('INTERLEAVE=BAND') if gdalVersionMin(maj=3, min=8): creationOptions.append('STATISTICS=YES') warpOptions = { 'format': 'COG', # preserve source SRS and resolution 'outputBounds': (ulx2, lry2, lrx2, uly2), 'setColorInterpretation': True, 'creationOptions': creationOptions, } if args.get('_progress', None) is None: callback = pbar = None else: callback = _gdal_callback pbar = args['_pbar'] = args['_progress']( total=100, leave=False, bar_format='{l_bar}{bar}| [{elapsed}<{remaining}]', ) logging.debug('warp(%s, ds, %s)', args['_dest'], ', '.join([str(k) + '=' + (f'\'{v}\'' if isinstance(v,str) else str(v)) for k,v in warpOptions.items()])) return gdal.Warp(args['_dest'], ds, **warpOptions, callback=callback, callback_data=pbar, ) def _gdal_callback(info, _message, pbar): pbar.update(info * 100 - pbar.n)