diff options
| author | Guilhem Moulin <guilhem@fripost.org> | 2025-05-25 14:50:33 +0200 | 
|---|---|---|
| committer | Guilhem Moulin <guilhem@fripost.org> | 2025-05-25 14:50:33 +0200 | 
| commit | c8720fb9d71a2fb44713cc628862c795fa831576 (patch) | |
| tree | e6eb3bf3312f2f4d8edff65737e1f5d86def2ba9 /webmap-cgi | |
| parent | 527fdfd1e46d66007758861dd0decf9c03043d0a (diff) | |
CGI: Don't hardcode geometry and primary key columns.
Diffstat (limited to 'webmap-cgi')
| -rwxr-xr-x | webmap-cgi | 87 | 
1 files changed, 66 insertions, 21 deletions
| @@ -54,25 +54,70 @@ def get_table_map() -> dict[tuple[str, str], str]:      return ret  SCHEMA_NAME : Final[str] = 'postgis' -def get_query(layername : str) -> bytes: -    """Get GeoJSON-producing query for the given layer.""" -    # TODO don't hardcode geometry column name -    # TODO[trixie]id_column => 'ogc_fid' (but don't hardcode either) -    query = 'SELECT convert_to(ST_AsGeoJSON(m.*,' -    query +=                                'geom_column=>\'wkb_geometry\',' -    query +=                                'pretty_bool=>\'f\'),' -    query +=                   '\'UTF8\') AS "GeoJSON" ' -    query +=  'FROM (' -    query +=    'SELECT l.*, %s AS layer_group, %s AS layer ' -    query +=      'FROM ' + common.escape_identifier(SCHEMA_NAME) -    query +=            '.' + common.escape_identifier(layername) + ' l ' -    query +=      'WHERE l.ogc_fid = %s' -    query +=  ') m' -    # The query never returns more than one row since we filter on a single FID. -    # Don't try to be clever and batch queries in an IN set or ANY as we -    # want to preserve the order in the response (so the feature(s) -    # exactly under the cursor are returned first). -    return query.encode('utf-8') +def get_query_map(layernames : set[str]) -> dict[str,bytes]: +    """Get GeoJSON-producing query map.""" +    ret = {} +    # pylint: disable-next=no-member +    with PG_CONN.cursor(binary=True, scrollable=False, withhold=False) as cur: +        for layername in layernames: +            cur.execute('SELECT f_geometry_column ' +                          'FROM ' + common.escape_identifier(SCHEMA_NAME) + '.geometry_columns ' +                          'WHERE f_table_schema = %s AND f_table_name = %s', +                        params=(SCHEMA_NAME, layername), +                        prepare=False) +            resp = cur.fetchone() +            if resp is None: +                continue +            geom_col = resp[0] +            resp = cur.fetchone() +            if resp is not None: +                logging.warning('Table "%s" has multiple geometry colums, ' +                                'only considering "%s"', +                                layername, geom_col) + +            # https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns +            cur.execute('SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type ' +                          'FROM pg_index i ' +                            'JOIN pg_attribute a ' +                                  ' ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) ' +                          'WHERE i.indrelid = %s::regclass AND i.indisprimary', +                        params=(common.escape_identifier(SCHEMA_NAME) + '.' + +                                    common.escape_identifier(layername),), +                        prepare=False) +            resp = cur.fetchone() +            if resp is None: +                logging.warning('Table "%s" has no PRIMARY KEY, ignoring', layername) +                continue +            pkey_col = resp[0] +            if resp[1] not in ('integer', 'bigint'): +                logging.warning('Table "%s" PRIMARY KEY "%s" has invalid type %s, ignoring', +                                layername, pkey_col, resp[1]) +                continue +            resp = cur.fetchone() +            if resp is not None: +                logging.warning('Table "%s" PRIMARY KEY spans over multiple columns, ignoring', +                                layername) +                continue + +            # TODO[trixie] id_column => 'ogc_fid' (but don't hardcode either) +            query = 'WITH lyr AS (' +            query +=  'SELECT m.*, %s AS layer_group, %s AS layer ' +            query +=      'FROM ' + common.escape_identifier(SCHEMA_NAME) + '.' +            query +=                common.escape_identifier(layername) + ' m ' +            query +=      'WHERE m.' + common.escape_identifier(pkey_col) + ' = %s' +            query += ') ' +            query += 'SELECT convert_to(' +            query +=    'ST_AsGeoJSON(lyr.*,' +            query +=      'geom_column=>' + common.escape_literal_str(geom_col) + ',' +            query +=      'pretty_bool=>\'f\'),' +            query +=    '\'UTF8\') AS "GeoJSON" ' +            query +=   'FROM lyr' +            # The query never returns more than one row since we filter on a single FID. +            # Don't try to be clever and batch queries in an IN set or ANY as we +            # want to preserve the order in the response (so the feature(s) +            # exactly under the cursor are returned first). +            ret[layername] = query.encode('utf-8') +    return ret  STATUS_OK : Final[str] = '200 OK' @@ -187,7 +232,7 @@ def handler():  common.init_logger(app=os_path.basename(__file__), level=logging.INFO)  TABLE_MAP : Final[dict[tuple[str, str], str]] = get_table_map() -QUERY_MAP : Final[dict[str,bytes]] = { lyr:get_query(lyr) for lyr in set(TABLE_MAP.values()) } +QUERY_MAP : Final[dict[str, bytes]] = get_query_map(set(TABLE_MAP.values()))  PG_CONN.execute( # pylint: disable=no-member      'SET search_path TO ' + common.escape_identifier(SCHEMA_NAME) + ',public', @@ -199,6 +244,6 @@ PG_CONN.execute( # pylint: disable=no-member  # drop functions and modules we don't need anymore  del common  del sys.modules['common'] -del get_query +del get_query_map  del get_table_map  del os_path | 
