#!/usr/bin/python3 #---------------------------------------------------------------------- # Webmap CGI (Common Gateway Interface) for the Klimatanalys Norr project # Copyright © 2025 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero # General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . #---------------------------------------------------------------------- # pylint: disable=invalid-name, missing-module-docstring, fixme # pylint: enable=invalid-name import sys from os import path as os_path from json import load as json_load, dumps as json_dumps, JSONDecodeError import logging from typing import Final, Iterator import atexit from psycopg import connect, Cursor # pylint: disable=import-error import common def get_table_map() -> dict[tuple[str, str], str]: """Get mapping of pairs (MVT group name, layername) to table name.""" ret = {} config = common.load_config() layer_groups = config.get('layer-groups', {}) layers = config.get('layers', {}) layernames = set(layers.keys()) for groupname, patterns in layer_groups.items(): for layername in common.layers_in_group(groupname, patterns, layernames): exportdef = layers[layername].get('publish', None) if exportdef is None: continue if isinstance(exportdef, str): exportdef = [ exportdef ] for layername_mvt in exportdef: k = (groupname, layername_mvt) if k in ret: raise RuntimeError(f'Duplicate key {k}') ret[k] = layername return ret SCHEMA_NAME : Final[str] = 'postgis' def get_query_map(layernames : set[str]) -> dict[str,bytes]: """Get GeoJSON-producing query map.""" ret = {} # pylint: disable-next=no-member with PG_CONN.cursor(binary=True, scrollable=False, withhold=False) as cur: for layername in layernames: cur.execute('SELECT f_geometry_column, coord_dimension, srid, type ' 'FROM ' + common.escape_identifier(SCHEMA_NAME) + '.geometry_columns ' 'WHERE f_table_schema = %s AND f_table_name = %s', params=(SCHEMA_NAME, layername), prepare=False) resp = cur.fetchone() if resp is None: continue geom_cols = [ resp[0] ] geom_dim = resp[1] force2d = geom_dim > 2 geom_type = resp[3] geom_srid = resp[2] if geom_srid != 3006: # If the SRS isn't projected and/or isn't in meter units then ST_Area() resp. # ST_Length() aren't in m² resp. m. We could reproject, but since the target # SRS is SWEREF 99 we just warn on mismatch logging.warning('Geometry column "%s" in table "%s" has SRID %d != 3006', geom_cols[0], layername, geom_srid) if geom_type in ('POLYGON', 'MULTIPOLYGON', 'POLYGONM', 'MULTIPOLYGONM'): d = 2 # surface elif geom_type in ('LINESTRING', 'MULTILINESTRING', 'LINESTRINGM', 'MULTILINESTRINGM'): d = 1 # curve elif geom_type in ('POINT', 'MULTIPOINT', 'POINTM', 'MULTIPOINTM'): d = 0 # point else: logging.warning('Geometry column "%s" in table "%s" has unknown type %s', geom_cols[0], layername, geom_type) d = -1 resp = cur.fetchone() if resp is not None: logging.warning('Table "%s" has multiple geometry colums, ' 'only considering "%s" (%s, SRID=%d, dim=%d)', layername, geom_cols[0], geom_type, geom_srid, geom_dim) while resp is not None: geom_cols.append( resp[0] ) resp = cur.fetchone() # https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns cur.execute('SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type ' 'FROM pg_index i ' 'JOIN pg_attribute a ' ' ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) ' 'WHERE i.indrelid = %s::regclass AND i.indisprimary', params=(common.escape_identifier(SCHEMA_NAME) + '.' + common.escape_identifier(layername),), prepare=False) resp = cur.fetchone() if resp is None: logging.warning('Table "%s" has no PRIMARY KEY, ignoring', layername) continue pkey_col = resp[0] if resp[1] not in ('integer', 'bigint'): logging.warning('Table "%s" PRIMARY KEY "%s" has invalid type %s, ignoring', layername, pkey_col, resp[1]) continue resp = cur.fetchone() if resp is not None: logging.warning('Table "%s" PRIMARY KEY spans over multiple columns, ignoring', layername) continue column_names = [] cur.execute('SELECT column_name FROM information_schema.columns ' 'WHERE table_schema = %s AND table_name = %s', params=(SCHEMA_NAME, layername), prepare=False) # never empty since the we know the table exists and has a primary key resp = cur.fetchone() while resp is not None: c = resp[0] # pylint: disable-next=too-many-boolean-expressions if (c in ('layer_group', 'layer') or (c == 'ogc_fid' and pkey_col != c) or (d == 2 and c in ('geom_area', 'geom_perimeter')) or (d == 1 and c == 'geom_length')): logging.warning('Duplicate column name "%s"', c) if c != pkey_col and c not in geom_cols: column_names.append(c) resp = cur.fetchone() query = 'WITH feature AS (' query += 'SELECT m.' + common.escape_identifier(pkey_col) + ' AS ogc_fid,' for column_name in column_names: query += 'm.' + common.escape_identifier(column_name) + ',' if force2d: geom_col2d_esc = 'ST_Force2D(m.' + common.escape_identifier(geom_cols[0]) + ')' else: geom_col2d_esc = 'm.' + common.escape_identifier(geom_cols[0]) if d == 2: query += 'ST_Area(' + geom_col2d_esc +') AS geom_area,' query += 'ST_Perimeter(' + geom_col2d_esc +') AS geom_perimeter,' elif d == 1: query += 'ST_Length(' + geom_col2d_esc +') AS geom_length,' query += '%s AS layer_group,%s AS layer ' query += 'FROM ' + common.escape_identifier(SCHEMA_NAME) + '.' query += common.escape_identifier(layername) + ' m ' query += 'WHERE m.' + common.escape_identifier(pkey_col) + ' = %s' query += ') ' # TODO[trixie] use json_serialize() from PostgreSQL 17 to avoid serializing on # the Python side. (There is also row_to_json() which might be of interest if # json not jsonb is needed.) query += 'SELECT to_jsonb(feature) FROM feature' # The query never returns more than one row since we filter on a single FID. # TODO: batch queries using ANY[] or an IN set (the # consummer will then need # to re-order the response) ret[layername] = query.encode('utf-8') return ret STATUS_OK : Final[str] = '200 OK' STATUS_BAD_REQUEST : Final[str] = '400 Bad Request' STATUS_NOT_ALLOWED : Final[str] = '405 Method Not Allowed' STATUS_INTERNAL_SERVER_ERROR : Final[str] = '500 Internal Server Error' EMPTY_RESPONSE_HEADERS : Final[list[tuple[str,str]]] = [ ('Content-Type', 'text/plain'), ('Content-Length', '0'), ] CONTENT_TYPE_JSON : Final[tuple[str,str]] = ('Content-Type', 'application/json; charset=UTF-8') MAX_FEATURE_COUNT : Final[int] = 500 def application(env, start_response) -> Iterator[bytes]: """Main application.""" if env['REQUEST_METHOD'].upper() != 'POST': logging.error('Invalid request method %s', env['REQUEST_METHOD']) start_response(STATUS_NOT_ALLOWED, EMPTY_RESPONSE_HEADERS) return content_type = env.get('CONTENT_TYPE', '').lower() if content_type != 'application/json' and not content_type.startswith('application/json;'): logging.error('Invalid Content-Type: %s', content_type) start_response(STATUS_BAD_REQUEST, EMPTY_RESPONSE_HEADERS) return first = True try: body = json_load(env['wsgi.input']) if not isinstance(body, list): raise ValueError start_response(STATUS_OK, [CONTENT_TYPE_JSON]) if not body: yield b'[]' return if len(body) > MAX_FEATURE_COUNT: logging.warning('Query has too many feature requests (%d), truncating to %d', len(body), MAX_FEATURE_COUNT) body = body[:MAX_FEATURE_COUNT] # pylint: disable-next=no-member with PG_CONN.cursor(binary=True, scrollable=False, withhold=False) as cur: for item in body: if not isinstance(item, dict): raise ValueError layer_group = item.get('layer_group', None) layer = item.get('layer', None) if not isinstance(layer_group, str) or not isinstance(layer, str): raise ValueError query = QUERY_MAP[TABLE_MAP[(layer_group, layer)]] fid = item.get('fid', None) if not isinstance(fid, int): raise ValueError cur.execute(query, params=(layer_group, layer, fid)) resp = cur.fetchone() if resp is None: continue # no match for this tuple if first: yield b'[' first = False else: yield b',' yield json_dumps(resp[0], ensure_ascii=False, separators=(',', ':')).encode('utf-8') # the query never returns more than one row since we filter on a single FID if first: yield b'[]' # no match, empty response first = False else: yield b']' except (JSONDecodeError, LookupError, UnicodeDecodeError, ValueError) as exc: logging.exception('Invalid request body') # start_response(,,sys.exc_info()) should work here, but doesn't # because of https://github.com/unbit/uwsgi/issues/2278 if first: start_response(STATUS_BAD_REQUEST, EMPTY_RESPONSE_HEADERS) else: # headers already sent, can't do better; the client will get a 200 status # code, but fail to parse the payload as JSON anyway exc_info = sys.exc_info() raise exc_info[1].with_traceback(exc_info[2]) from exc except Exception as exc: # pylint: disable=broad-exception-caught logging.exception('Internal Server Error') if first: start_response(STATUS_INTERNAL_SERVER_ERROR, EMPTY_RESPONSE_HEADERS) else: exc_info = sys.exc_info() raise exc_info[1].with_traceback(exc_info[2]) from exc # We could use a psycopg_pool.ConnectionPool() but that would be # overkill since we only have 2 workers and no threads. So each worker # simply opens a (single) connection to PostgreSQL at launch time. # Use autocommit to avoid starting a transaction, cf. # https://www.psycopg.org/psycopg3/docs/basic/transactions.html#autocommit-transactions PG_CONN = connect('postgresql://webmap_guest@/webmap', autocommit=True, prepare_threshold=0, # TODO[trixie] use cursor_factory=RawCursor # https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#cursor-types cursor_factory=Cursor) @atexit.register def handler(): """Gracefully close the connection before terminating the worker""" # avoid "AttributeError: 'NoneType' object has no attribute 'connection_summary'" # when destructing the object # TODO[trixie] reevaluate, possibly related to https://github.com/psycopg/psycopg/issues/591 PG_CONN.close() # pylint: disable=no-member common.init_logger(app=os_path.basename(__file__), level=logging.INFO) TABLE_MAP : Final[dict[tuple[str, str], str]] = get_table_map() QUERY_MAP : Final[dict[str, bytes]] = get_query_map(set(TABLE_MAP.values())) PG_CONN.execute( # pylint: disable=no-member 'SET search_path TO ' + common.escape_identifier(SCHEMA_NAME) + ',public', prepare=False) PG_CONN.execute( # pylint: disable=no-member 'SET statement_timeout TO 15000', # 15s prepare=False) # drop functions and modules we don't need anymore del common del sys.modules['common'] del get_query_map del get_table_map del os_path