diff options
author | Guilhem Moulin <guilhem@fripost.org> | 2025-05-25 14:50:33 +0200 |
---|---|---|
committer | Guilhem Moulin <guilhem@fripost.org> | 2025-05-25 14:50:33 +0200 |
commit | c8720fb9d71a2fb44713cc628862c795fa831576 (patch) | |
tree | e6eb3bf3312f2f4d8edff65737e1f5d86def2ba9 /webmap-cgi | |
parent | 527fdfd1e46d66007758861dd0decf9c03043d0a (diff) |
CGI: Don't hardcode geometry and primary key columns.
Diffstat (limited to 'webmap-cgi')
-rwxr-xr-x | webmap-cgi | 87 |
1 files changed, 66 insertions, 21 deletions
@@ -54,25 +54,70 @@ def get_table_map() -> dict[tuple[str, str], str]: return ret SCHEMA_NAME : Final[str] = 'postgis' -def get_query(layername : str) -> bytes: - """Get GeoJSON-producing query for the given layer.""" - # TODO don't hardcode geometry column name - # TODO[trixie]id_column => 'ogc_fid' (but don't hardcode either) - query = 'SELECT convert_to(ST_AsGeoJSON(m.*,' - query += 'geom_column=>\'wkb_geometry\',' - query += 'pretty_bool=>\'f\'),' - query += '\'UTF8\') AS "GeoJSON" ' - query += 'FROM (' - query += 'SELECT l.*, %s AS layer_group, %s AS layer ' - query += 'FROM ' + common.escape_identifier(SCHEMA_NAME) - query += '.' + common.escape_identifier(layername) + ' l ' - query += 'WHERE l.ogc_fid = %s' - query += ') m' - # The query never returns more than one row since we filter on a single FID. - # Don't try to be clever and batch queries in an IN set or ANY as we - # want to preserve the order in the response (so the feature(s) - # exactly under the cursor are returned first). - return query.encode('utf-8') +def get_query_map(layernames : set[str]) -> dict[str,bytes]: + """Get GeoJSON-producing query map.""" + ret = {} + # pylint: disable-next=no-member + with PG_CONN.cursor(binary=True, scrollable=False, withhold=False) as cur: + for layername in layernames: + cur.execute('SELECT f_geometry_column ' + 'FROM ' + common.escape_identifier(SCHEMA_NAME) + '.geometry_columns ' + 'WHERE f_table_schema = %s AND f_table_name = %s', + params=(SCHEMA_NAME, layername), + prepare=False) + resp = cur.fetchone() + if resp is None: + continue + geom_col = resp[0] + resp = cur.fetchone() + if resp is not None: + logging.warning('Table "%s" has multiple geometry colums, ' + 'only considering "%s"', + layername, geom_col) + + # https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns + cur.execute('SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type ' + 'FROM pg_index i ' + 'JOIN pg_attribute a ' + ' ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) ' + 'WHERE i.indrelid = %s::regclass AND i.indisprimary', + params=(common.escape_identifier(SCHEMA_NAME) + '.' + + common.escape_identifier(layername),), + prepare=False) + resp = cur.fetchone() + if resp is None: + logging.warning('Table "%s" has no PRIMARY KEY, ignoring', layername) + continue + pkey_col = resp[0] + if resp[1] not in ('integer', 'bigint'): + logging.warning('Table "%s" PRIMARY KEY "%s" has invalid type %s, ignoring', + layername, pkey_col, resp[1]) + continue + resp = cur.fetchone() + if resp is not None: + logging.warning('Table "%s" PRIMARY KEY spans over multiple columns, ignoring', + layername) + continue + + # TODO[trixie] id_column => 'ogc_fid' (but don't hardcode either) + query = 'WITH lyr AS (' + query += 'SELECT m.*, %s AS layer_group, %s AS layer ' + query += 'FROM ' + common.escape_identifier(SCHEMA_NAME) + '.' + query += common.escape_identifier(layername) + ' m ' + query += 'WHERE m.' + common.escape_identifier(pkey_col) + ' = %s' + query += ') ' + query += 'SELECT convert_to(' + query += 'ST_AsGeoJSON(lyr.*,' + query += 'geom_column=>' + common.escape_literal_str(geom_col) + ',' + query += 'pretty_bool=>\'f\'),' + query += '\'UTF8\') AS "GeoJSON" ' + query += 'FROM lyr' + # The query never returns more than one row since we filter on a single FID. + # Don't try to be clever and batch queries in an IN set or ANY as we + # want to preserve the order in the response (so the feature(s) + # exactly under the cursor are returned first). + ret[layername] = query.encode('utf-8') + return ret STATUS_OK : Final[str] = '200 OK' @@ -187,7 +232,7 @@ def handler(): common.init_logger(app=os_path.basename(__file__), level=logging.INFO) TABLE_MAP : Final[dict[tuple[str, str], str]] = get_table_map() -QUERY_MAP : Final[dict[str,bytes]] = { lyr:get_query(lyr) for lyr in set(TABLE_MAP.values()) } +QUERY_MAP : Final[dict[str, bytes]] = get_query_map(set(TABLE_MAP.values())) PG_CONN.execute( # pylint: disable=no-member 'SET search_path TO ' + common.escape_identifier(SCHEMA_NAME) + ',public', @@ -199,6 +244,6 @@ PG_CONN.execute( # pylint: disable=no-member # drop functions and modules we don't need anymore del common del sys.modules['common'] -del get_query +del get_query_map del get_table_map del os_path |