aboutsummaryrefslogtreecommitdiffstats
path: root/webmap-cgi
blob: b5e5f983ceb4de1d097d2762df0e3a080582e836 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
#!/usr/bin/python3

#----------------------------------------------------------------------
# Webmap CGI (Common Gateway Interface) for the Klimatanalys Norr project
# Copyright © 2025 Guilhem Moulin <info@guilhem.se>
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#----------------------------------------------------------------------

# pylint: disable=invalid-name, missing-module-docstring, fixme
# pylint: enable=invalid-name

import sys
from os import path as os_path
from json import load as json_load, dumps as json_dumps, JSONDecodeError
import logging
from typing import Final, Iterator
import atexit

from psycopg import connect, Cursor # pylint: disable=import-error

import common

def get_table_map() -> dict[tuple[str, str], str]:
    """Get mapping of pairs (MVT group name, layername) to table name."""
    ret = {}
    config = common.load_config()
    layer_groups = config.get('layer-groups', {})
    layers = config.get('layers', {})
    layernames = set(layers.keys())
    for groupname, patterns in layer_groups.items():
        for layername in common.layers_in_group(groupname, patterns, layernames):
            exportdef = layers[layername].get('publish', None)
            if exportdef is None:
                continue
            if isinstance(exportdef, str):
                exportdef = [ exportdef ]
            for layername_mvt in exportdef:
                k = (groupname, layername_mvt)
                if k in ret:
                    raise RuntimeError(f'Duplicate key {k}')
                ret[k] = layername
    return ret

SCHEMA_NAME : Final[str] = 'postgis'
def get_query_map(layernames : set[str]) -> dict[str,bytes]:
    """Get GeoJSON-producing query map."""
    ret = {}
    # pylint: disable-next=no-member
    with PG_CONN.cursor(binary=True, scrollable=False, withhold=False) as cur:
        for layername in layernames:
            cur.execute('SELECT f_geometry_column, coord_dimension, srid, type '
                          'FROM ' + common.escape_identifier(SCHEMA_NAME) + '.geometry_columns '
                          'WHERE f_table_schema = %s AND f_table_name = %s',
                        params=(SCHEMA_NAME, layername),
                        prepare=False)
            resp = cur.fetchone()
            if resp is None:
                continue
            geom_cols = [ resp[0] ]
            geom_dim = resp[1]
            force2d = geom_dim > 2
            geom_type = resp[3]
            geom_srid = resp[2]
            if geom_srid != 3006:
                # If the SRS isn't projected and/or isn't in meter units then ST_Area() resp.
                # ST_Length() aren't in m² resp. m.  We could reproject, but since the target
                # SRS is SWEREF 99 we just warn on mismatch
                logging.warning('Geometry column "%s" in table "%s" has SRID %d != 3006',
                                geom_cols[0], layername, geom_srid)
            if geom_type in ('POLYGON', 'MULTIPOLYGON',
                             'POLYGONM', 'MULTIPOLYGONM'):
                d = 2 # surface
            elif geom_type in ('LINESTRING', 'MULTILINESTRING',
                               'LINESTRINGM', 'MULTILINESTRINGM'):
                d = 1 # curve
            elif geom_type in ('POINT', 'MULTIPOINT',
                               'POINTM', 'MULTIPOINTM'):
                d = 0 # point
            else:
                logging.warning('Geometry column "%s" in table "%s" has unknown type %s',
                                geom_cols[0], layername, geom_type)
                d = -1

            resp = cur.fetchone()
            if resp is not None:
                logging.warning('Table "%s" has multiple geometry colums, '
                                'only considering "%s" (%s, SRID=%d, dim=%d)',
                                layername, geom_cols[0], geom_type, geom_srid, geom_dim)
                while resp is not None:
                    geom_cols.append( resp[0] )
                    resp = cur.fetchone()

            # https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns
            cur.execute('SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type '
                          'FROM pg_index i '
                            'JOIN pg_attribute a '
                                  ' ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) '
                          'WHERE i.indrelid = %s::regclass AND i.indisprimary',
                        params=(common.escape_identifier(SCHEMA_NAME) + '.' +
                                    common.escape_identifier(layername),),
                        prepare=False)
            resp = cur.fetchone()
            if resp is None:
                logging.warning('Table "%s" has no PRIMARY KEY, ignoring', layername)
                continue
            pkey_col = resp[0]
            if resp[1] not in ('integer', 'bigint'):
                logging.warning('Table "%s" PRIMARY KEY "%s" has invalid type %s, ignoring',
                                layername, pkey_col, resp[1])
                continue
            resp = cur.fetchone()
            if resp is not None:
                logging.warning('Table "%s" PRIMARY KEY spans over multiple columns, ignoring',
                                layername)
                continue

            column_names = []
            cur.execute('SELECT column_name FROM information_schema.columns '
                          'WHERE table_schema = %s AND table_name = %s',
                        params=(SCHEMA_NAME, layername),
                        prepare=False)
            # never empty since the we know the table exists and has a primary key
            resp = cur.fetchone()
            while resp is not None:
                c = resp[0]
                # pylint: disable-next=too-many-boolean-expressions
                if (c in ('layer_group', 'layer') or
                        (c == 'ogc_fid' and pkey_col != c) or
                        (d == 2 and c in ('geom_area', 'geom_perimeter')) or
                        (d == 1 and c == 'geom_length')):
                    logging.warning('Duplicate column name "%s"', c)
                if c != pkey_col and c not in geom_cols:
                    column_names.append(c)
                resp = cur.fetchone()

            query = 'WITH feature AS ('
            query +=  'SELECT m.' + common.escape_identifier(pkey_col) + ' AS ogc_fid,'
            for column_name in column_names:
                query +=     'm.' + common.escape_identifier(column_name) + ','
            if force2d:
                geom_col2d_esc = 'ST_Force2D(m.' + common.escape_identifier(geom_cols[0]) + ')'
            else:
                geom_col2d_esc = 'm.' + common.escape_identifier(geom_cols[0])
            if d == 2:
                query +=     'ST_Area(' + geom_col2d_esc +') AS geom_area,'
                query +=     'ST_Perimeter(' + geom_col2d_esc +') AS geom_perimeter,'
            elif d == 1:
                query +=     'ST_Length(' + geom_col2d_esc +') AS geom_length,'
            query +=         '%s AS layer_group,%s AS layer '
            query +=      'FROM ' + common.escape_identifier(SCHEMA_NAME) + '.'
            query +=                common.escape_identifier(layername) + ' m '
            query +=      'WHERE m.' + common.escape_identifier(pkey_col) + ' = %s'
            query += ') '
            # TODO[trixie] use json_serialize() from PostgreSQL 17 to avoid serializing on
            # the Python side.  (There is also row_to_json() which might be of interest if
            # json not jsonb is needed.)
            query += 'SELECT to_jsonb(feature) FROM feature'
            # The query never returns more than one row since we filter on a single FID.
            # TODO: batch queries using ANY[] or an IN set (the # consummer will then need
            # to re-order the response)
            ret[layername] = query.encode('utf-8')
    return ret


STATUS_OK : Final[str] = '200 OK'
STATUS_BAD_REQUEST : Final[str] = '400 Bad Request'
STATUS_NOT_ALLOWED : Final[str] = '405 Method Not Allowed'
STATUS_INTERNAL_SERVER_ERROR : Final[str] = '500 Internal Server Error'

EMPTY_RESPONSE_HEADERS : Final[list[tuple[str,str]]] = [
    ('Content-Type', 'text/plain'),
    ('Content-Length', '0'),
]
CONTENT_TYPE_JSON : Final[tuple[str,str]] = ('Content-Type', 'application/json; charset=UTF-8')

MAX_FEATURE_COUNT : Final[int] = 500
def application(env, start_response) -> Iterator[bytes]:
    """Main application."""
    if env['REQUEST_METHOD'].upper() != 'POST':
        logging.error('Invalid request method %s', env['REQUEST_METHOD'])
        start_response(STATUS_NOT_ALLOWED, EMPTY_RESPONSE_HEADERS)
        return

    content_type = env.get('CONTENT_TYPE', '').lower()
    if content_type != 'application/json' and not content_type.startswith('application/json;'):
        logging.error('Invalid Content-Type: %s', content_type)
        start_response(STATUS_BAD_REQUEST, EMPTY_RESPONSE_HEADERS)
        return

    first = True
    try:
        body = json_load(env['wsgi.input'])
        if not isinstance(body, list):
            raise ValueError

        start_response(STATUS_OK, [CONTENT_TYPE_JSON])
        if not body:
            yield b'[]'
            return

        if len(body) > MAX_FEATURE_COUNT:
            logging.warning('Query has too many feature requests (%d), truncating to %d',
                            len(body), MAX_FEATURE_COUNT)
            body = body[:MAX_FEATURE_COUNT]

        # pylint: disable-next=no-member
        with PG_CONN.cursor(binary=True, scrollable=False, withhold=False) as cur:
            for item in body:
                if not isinstance(item, dict):
                    raise ValueError
                layer_group = item.get('layer_group', None)
                layer = item.get('layer', None)
                if not isinstance(layer_group, str) or not isinstance(layer, str):
                    raise ValueError
                query = QUERY_MAP[TABLE_MAP[(layer_group, layer)]]
                fid = item.get('fid', None)
                if not isinstance(fid, int):
                    raise ValueError
                cur.execute(query, params=(layer_group, layer, fid))
                resp = cur.fetchone()
                if resp is None:
                    continue # no match for this tuple
                if first:
                    yield b'['
                    first = False
                else:
                    yield b','
                yield json_dumps(resp[0], ensure_ascii=False, separators=(',', ':')).encode('utf-8')
                # the query never returns more than one row since we filter on a single FID
        if first:
            yield b'[]' # no match, empty response
            first = False
        else:
            yield b']'

    except (JSONDecodeError, LookupError, UnicodeDecodeError, ValueError) as exc:
        logging.exception('Invalid request body')
        # start_response(,,sys.exc_info()) should work here, but doesn't
        # because of https://github.com/unbit/uwsgi/issues/2278
        if first:
            start_response(STATUS_BAD_REQUEST, EMPTY_RESPONSE_HEADERS)
        else:
            # headers already sent, can't do better; the client will get a 200 status
            # code, but fail to parse the payload as JSON anyway
            exc_info = sys.exc_info()
            raise exc_info[1].with_traceback(exc_info[2]) from exc
    except Exception as exc: # pylint: disable=broad-exception-caught
        logging.exception('Internal Server Error')
        if first:
            start_response(STATUS_INTERNAL_SERVER_ERROR, EMPTY_RESPONSE_HEADERS)
        else:
            exc_info = sys.exc_info()
            raise exc_info[1].with_traceback(exc_info[2]) from exc

# We could use a psycopg_pool.ConnectionPool() but that would be
# overkill since we only have 2 workers and no threads.  So each worker
# simply opens a (single) connection to PostgreSQL at launch time.
# Use autocommit to avoid starting a transaction, cf.
# https://www.psycopg.org/psycopg3/docs/basic/transactions.html#autocommit-transactions
PG_CONN = connect('postgresql://webmap_guest@/webmap',
                  autocommit=True,
                  prepare_threshold=0,
                  # TODO[trixie] use cursor_factory=RawCursor
                  # https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#cursor-types
                  cursor_factory=Cursor)

@atexit.register
def handler():
    """Gracefully close the connection before terminating the worker"""
    # avoid "AttributeError: 'NoneType' object has no attribute 'connection_summary'"
    # when destructing the object
    # TODO[trixie] reevaluate, possibly related to https://github.com/psycopg/psycopg/issues/591
    PG_CONN.close() # pylint: disable=no-member

common.init_logger(app=os_path.basename(__file__), level=logging.INFO)
TABLE_MAP : Final[dict[tuple[str, str], str]] = get_table_map()
QUERY_MAP : Final[dict[str, bytes]] = get_query_map(set(TABLE_MAP.values()))

PG_CONN.execute( # pylint: disable=no-member
    'SET search_path TO ' + common.escape_identifier(SCHEMA_NAME) + ',public',
    prepare=False)
PG_CONN.execute( # pylint: disable=no-member
    'SET statement_timeout TO 15000', # 15s
    prepare=False)

# drop functions and modules we don't need anymore
del common
del sys.modules['common']
del get_query_map
del get_table_map
del os_path