aboutsummaryrefslogtreecommitdiffstats
path: root/webmap-cgi
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-05-25 14:50:33 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-05-25 14:50:33 +0200
commitc8720fb9d71a2fb44713cc628862c795fa831576 (patch)
treee6eb3bf3312f2f4d8edff65737e1f5d86def2ba9 /webmap-cgi
parent527fdfd1e46d66007758861dd0decf9c03043d0a (diff)
CGI: Don't hardcode geometry and primary key columns.
Diffstat (limited to 'webmap-cgi')
-rwxr-xr-xwebmap-cgi87
1 files changed, 66 insertions, 21 deletions
diff --git a/webmap-cgi b/webmap-cgi
index c32a389..86a0566 100755
--- a/webmap-cgi
+++ b/webmap-cgi
@@ -54,25 +54,70 @@ def get_table_map() -> dict[tuple[str, str], str]:
return ret
SCHEMA_NAME : Final[str] = 'postgis'
-def get_query(layername : str) -> bytes:
- """Get GeoJSON-producing query for the given layer."""
- # TODO don't hardcode geometry column name
- # TODO[trixie]id_column => 'ogc_fid' (but don't hardcode either)
- query = 'SELECT convert_to(ST_AsGeoJSON(m.*,'
- query += 'geom_column=>\'wkb_geometry\','
- query += 'pretty_bool=>\'f\'),'
- query += '\'UTF8\') AS "GeoJSON" '
- query += 'FROM ('
- query += 'SELECT l.*, %s AS layer_group, %s AS layer '
- query += 'FROM ' + common.escape_identifier(SCHEMA_NAME)
- query += '.' + common.escape_identifier(layername) + ' l '
- query += 'WHERE l.ogc_fid = %s'
- query += ') m'
- # The query never returns more than one row since we filter on a single FID.
- # Don't try to be clever and batch queries in an IN set or ANY as we
- # want to preserve the order in the response (so the feature(s)
- # exactly under the cursor are returned first).
- return query.encode('utf-8')
+def get_query_map(layernames : set[str]) -> dict[str,bytes]:
+ """Get GeoJSON-producing query map."""
+ ret = {}
+ # pylint: disable-next=no-member
+ with PG_CONN.cursor(binary=True, scrollable=False, withhold=False) as cur:
+ for layername in layernames:
+ cur.execute('SELECT f_geometry_column '
+ 'FROM ' + common.escape_identifier(SCHEMA_NAME) + '.geometry_columns '
+ 'WHERE f_table_schema = %s AND f_table_name = %s',
+ params=(SCHEMA_NAME, layername),
+ prepare=False)
+ resp = cur.fetchone()
+ if resp is None:
+ continue
+ geom_col = resp[0]
+ resp = cur.fetchone()
+ if resp is not None:
+ logging.warning('Table "%s" has multiple geometry colums, '
+ 'only considering "%s"',
+ layername, geom_col)
+
+ # https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns
+ cur.execute('SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type '
+ 'FROM pg_index i '
+ 'JOIN pg_attribute a '
+ ' ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) '
+ 'WHERE i.indrelid = %s::regclass AND i.indisprimary',
+ params=(common.escape_identifier(SCHEMA_NAME) + '.' +
+ common.escape_identifier(layername),),
+ prepare=False)
+ resp = cur.fetchone()
+ if resp is None:
+ logging.warning('Table "%s" has no PRIMARY KEY, ignoring', layername)
+ continue
+ pkey_col = resp[0]
+ if resp[1] not in ('integer', 'bigint'):
+ logging.warning('Table "%s" PRIMARY KEY "%s" has invalid type %s, ignoring',
+ layername, pkey_col, resp[1])
+ continue
+ resp = cur.fetchone()
+ if resp is not None:
+ logging.warning('Table "%s" PRIMARY KEY spans over multiple columns, ignoring',
+ layername)
+ continue
+
+ # TODO[trixie] id_column => 'ogc_fid' (but don't hardcode either)
+ query = 'WITH lyr AS ('
+ query += 'SELECT m.*, %s AS layer_group, %s AS layer '
+ query += 'FROM ' + common.escape_identifier(SCHEMA_NAME) + '.'
+ query += common.escape_identifier(layername) + ' m '
+ query += 'WHERE m.' + common.escape_identifier(pkey_col) + ' = %s'
+ query += ') '
+ query += 'SELECT convert_to('
+ query += 'ST_AsGeoJSON(lyr.*,'
+ query += 'geom_column=>' + common.escape_literal_str(geom_col) + ','
+ query += 'pretty_bool=>\'f\'),'
+ query += '\'UTF8\') AS "GeoJSON" '
+ query += 'FROM lyr'
+ # The query never returns more than one row since we filter on a single FID.
+ # Don't try to be clever and batch queries in an IN set or ANY as we
+ # want to preserve the order in the response (so the feature(s)
+ # exactly under the cursor are returned first).
+ ret[layername] = query.encode('utf-8')
+ return ret
STATUS_OK : Final[str] = '200 OK'
@@ -187,7 +232,7 @@ def handler():
common.init_logger(app=os_path.basename(__file__), level=logging.INFO)
TABLE_MAP : Final[dict[tuple[str, str], str]] = get_table_map()
-QUERY_MAP : Final[dict[str,bytes]] = { lyr:get_query(lyr) for lyr in set(TABLE_MAP.values()) }
+QUERY_MAP : Final[dict[str, bytes]] = get_query_map(set(TABLE_MAP.values()))
PG_CONN.execute( # pylint: disable=no-member
'SET search_path TO ' + common.escape_identifier(SCHEMA_NAME) + ',public',
@@ -199,6 +244,6 @@ PG_CONN.execute( # pylint: disable=no-member
# drop functions and modules we don't need anymore
del common
del sys.modules['common']
-del get_query
+del get_query_map
del get_table_map
del os_path