#!/usr/bin/python3 #---------------------------------------------------------------------- # Webmap CGI (Common Gateway Interface) for the Klimatanalys Norr project # Copyright © 2025 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero # General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . #---------------------------------------------------------------------- # pylint: disable=invalid-name, missing-module-docstring, fixme # pylint: enable=invalid-name import sys from os import path as os_path from json import load as json_load, JSONDecodeError import logging from typing import Final, Iterator import atexit from psycopg import connect, Cursor # pylint: disable=import-error import common def get_table_map() -> dict[tuple[str, str], str]: """Get mapping of pairs (MVT group name, layername) to table name.""" ret = {} config = common.load_config() layer_groups = config.get('layer-groups', {}) layers = config.get('layers', {}) layernames = set(layers.keys()) for groupname, patterns in layer_groups.items(): for layername in common.layers_in_group(groupname, patterns, layernames): exportdef = layers[layername].get('publish', None) if exportdef is None: continue if isinstance(exportdef, str): exportdef = [ exportdef ] for layername_mvt in exportdef: k = (groupname, layername_mvt) if k in ret: raise RuntimeError(f'Duplicate key {k}') ret[k] = layername return ret SCHEMA_NAME : Final[str] = 'postgis' def get_query(layername : str) -> bytes: """Get GeoJSON-producing query for the given layer.""" # TODO don't hardcode geometry column name # TODO[trixie]id_column => 'ogc_fid' (but don't hardcode either) query = 'SELECT convert_to(ST_AsGeoJSON(m.*,' query += 'geom_column=>\'wkb_geometry\',' query += 'pretty_bool=>\'f\'),' query += '\'UTF8\') AS "GeoJSON" ' query += 'FROM (' query += 'SELECT l.* ' query += 'FROM ' + common.escape_identifier(SCHEMA_NAME) query += '.' + common.escape_identifier(layername) + ' l ' query += 'WHERE l.ogc_fid = %s' query += ') m' return query.encode('utf-8') STATUS_OK : Final[str] = '200 OK' STATUS_BAD_REQUEST : Final[str] = '400 Bad Request' STATUS_NOT_ALLOWED : Final[str] = '405 Method Not Allowed' STATUS_INTERNAL_SERVER_ERROR : Final[str] = '500 Internal Server Error' EMPTY_RESPONSE_HEADERS : Final[list[tuple[str,str]]] = [ ('Content-Type', 'text/plain'), ('Content-Length', '0'), ] CONTENT_TYPE_JSON : Final[tuple[str,str]] = ('Content-Type', 'application/json; charset=UTF-8') MAX_FEATURE_COUNT : Final[int] = 500 def application(env, start_response) -> Iterator[bytes]: """Main application.""" if env['REQUEST_METHOD'].upper() != 'POST': logging.error('Invalid request method %s', env['REQUEST_METHOD']) start_response(STATUS_NOT_ALLOWED, EMPTY_RESPONSE_HEADERS) return content_type = env.get('CONTENT_TYPE', '').lower() if content_type != 'application/json' and not content_type.startswith('application/json;'): logging.error('Invalid Content-Type: %s', content_type) start_response(STATUS_BAD_REQUEST, EMPTY_RESPONSE_HEADERS) return first = True try: body = json_load(env['wsgi.input']) if not isinstance(body, dict): raise ValueError start_response(STATUS_OK, [CONTENT_TYPE_JSON]) # pylint: disable-next=no-member with PG_CONN.cursor(binary=True, scrollable=False, withhold=False) as cur: if not isinstance(body, dict): raise ValueError mvt = body.get('mvt', None) layername = body.get('layer', None) if not isinstance(mvt, str) or not isinstance(layername, str): raise ValueError query = QUERY_MAP[TABLE_MAP[(mvt, layername)]] fid = body.get('fid', None) if not isinstance(fid, int): raise ValueError cur.execute(query, params=(fid,)) resp = cur.fetchone() if resp is not None: yield resp[0] except (JSONDecodeError, LookupError, UnicodeDecodeError, ValueError) as exc: logging.exception('Invalid request body') # start_response(,,sys.exc_info()) should work here, but doesn't # because of https://github.com/unbit/uwsgi/issues/2278 if first: start_response(STATUS_BAD_REQUEST, EMPTY_RESPONSE_HEADERS) else: # headers already sent, can't do better; the client will get a 200 status # code, but fail to parse the payload as JSON anyway exc_info = sys.exc_info() raise exc_info[1].with_traceback(exc_info[2]) from exc except Exception as exc: # pylint: disable=broad-exception-caught logging.exception('Internal Server Error') if first: start_response(STATUS_INTERNAL_SERVER_ERROR, EMPTY_RESPONSE_HEADERS) else: exc_info = sys.exc_info() raise exc_info[1].with_traceback(exc_info[2]) from exc # We could use a psycopg_pool.ConnectionPool() but that would be # overkill since we only have 2 workers and no threads. So each worker # simply opens a (single) connection to PostgreSQL at launch time. # Use autocommit to avoid starting a transaction, cf. # https://www.psycopg.org/psycopg3/docs/basic/transactions.html#autocommit-transactions PG_CONN = connect('postgresql://webmap_guest@/webmap', autocommit=True, prepare_threshold=0, # TODO[trixie] use cursor_factory=RawCursor # https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#cursor-types cursor_factory=Cursor) @atexit.register def handler(): """Gracefully close the connection before terminating the worker""" # avoid "AttributeError: 'NoneType' object has no attribute 'connection_summary'" # when destructing the object # TODO[trixie] reevaluate, possibly related to https://github.com/psycopg/psycopg/issues/591 PG_CONN.close() # pylint: disable=no-member common.init_logger(app=os_path.basename(__file__), level=logging.INFO) TABLE_MAP : Final[dict[tuple[str, str], str]] = get_table_map() QUERY_MAP : Final[dict[str,bytes]] = { lyr:get_query(lyr) for lyr in set(TABLE_MAP.values()) } PG_CONN.execute( # pylint: disable=no-member 'SET search_path TO ' + common.escape_identifier(SCHEMA_NAME) + ',public', prepare=False) PG_CONN.execute( # pylint: disable=no-member 'SET statement_timeout TO 15000', # 15s prepare=False) del sys.modules['common'] del common del get_query del get_table_map del os_path