aboutsummaryrefslogtreecommitdiffstats
path: root/common_gdal.py
diff options
context:
space:
mode:
authorGuilhem Moulin <guilhem@fripost.org>2025-04-19 05:28:42 +0200
committerGuilhem Moulin <guilhem@fripost.org>2025-04-21 17:58:41 +0200
commitc33799f69e7eb42cb0ab4735c7e878d74faca16a (patch)
tree979d9182af70a735f5b531f370360f081f0e8749 /common_gdal.py
parent6bd4f5f19928cd2783defca0316bcb6bbc042cd2 (diff)
webmap-import: Break down into separate modules.
Diffstat (limited to 'common_gdal.py')
-rw-r--r--common_gdal.py352
1 files changed, 352 insertions, 0 deletions
diff --git a/common_gdal.py b/common_gdal.py
new file mode 100644
index 0000000..aa451ae
--- /dev/null
+++ b/common_gdal.py
@@ -0,0 +1,352 @@
+#!/usr/bin/python3
+
+#----------------------------------------------------------------------
+# Backend utilities for the Klimatanalys Norr project (common GDAL functions)
+# Copyright © 2024-2025 Guilhem Moulin <info@guilhem.se>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#----------------------------------------------------------------------
+
+# pylint: disable=invalid-name, missing-module-docstring
+
+import logging
+import math
+import re
+from typing import Any, Optional
+
+from osgeo import gdal, ogr, osr
+
+from common import BadConfiguration
+
+# pylint: disable-next=redefined-builtin
+def gdalVersionMin(maj : int = 0, min : int = 0, rev : int = 0) -> bool:
+ """Return a boolean indicating whether the installer GDAL version is
+ greater than or equal to the provider (maj, min, rev) triplet."""
+
+ if maj < 1 or (maj == 1 and min < 10):
+ # GDAL_VERSION_NUM() macro was changed in 1.10. That version
+ # was released in 2013 so we blindly assume the installer
+ # version is more recent
+ return True
+
+ version_cur = int(gdal.VersionInfo())
+ # cf. GDAL_COMPUTE_VERSION(maj,min,rev) in gcore/gdal_version.h.in
+ version_min = maj*1000000 + min*10000 + rev*100
+ return version_min <= version_cur
+
+def gdalGetMetadataItem(obj : gdal.MajorObject, k : str) -> bool:
+ """Wrapper around gdal.MajorObject.GetMetadataItem(name)."""
+
+ v = obj.GetMetadataItem(k)
+ if v is not None and isinstance(v, str):
+ return v.upper() == 'YES'
+
+ return False
+
+# pylint: disable-next=dangerous-default-value
+def gdalSetOpenExArgs(option_dict : Optional[dict[str, Any]] = {},
+ flags : int = 0) -> tuple[dict[str, int|list[str]], gdal.Driver]:
+ """Return a pair kwargs and driver to use with gdal.OpenEx()."""
+
+ kwargs = { 'nOpenFlags': gdal.OF_VECTOR | flags }
+
+ fmt = option_dict.get('format', None)
+ if fmt is None:
+ drv = None
+ else:
+ drv = gdal.GetDriverByName(fmt)
+ if drv is None:
+ # pylint: disable-next=broad-exception-raised
+ raise Exception(f'Unknown driver name "{fmt}"')
+ if not gdalGetMetadataItem(drv, gdal.DCAP_VECTOR):
+ # pylint: disable-next=broad-exception-raised
+ raise Exception(f'Driver "{drv.ShortName}" has no vector capabilities')
+ kwargs['allowed_drivers'] = [ drv.ShortName ]
+
+ oo = option_dict.get('open-options', None)
+ if oo is not None:
+ kwargs['open_options'] = [ k + '=' + str(v) for k, v in oo.items() ]
+ return kwargs, drv
+
+def getSRS(srs_str : Optional[str]) -> osr.SpatialReference:
+ """Return the decoded Spatial Reference System."""
+
+ if srs_str is None:
+ return None
+
+ srs = osr.SpatialReference()
+ if srs_str.startswith('EPSG:'):
+ code = int(srs_str.removeprefix('EPSG:'))
+ srs.ImportFromEPSG(code)
+ else:
+ # pylint: disable-next=broad-exception-raised
+ raise Exception(f'Unknown SRS {srs_str}')
+
+ logging.debug('Default SRS: "%s" (%s)', srs.ExportToProj4(), srs.GetName())
+ return srs
+
+def getExtent(extent : Optional[tuple[float, float, float, float]],
+ srs : Optional[osr.SpatialReference] = None) -> tuple[ogr.Geometry, ogr.Geometry]:
+ """Convert extent (minX, minY, maxX, maxY) into a polygon and assign the
+ given SRS. Return a pair with the densified and non-densified extent.
+ Like apps/ogr2ogr_lib.cpp, the former is obtained by segmentizing the
+ polygon to make sure it is sufficiently densified when transforming to
+ source layer SRS for spatial filtering."""
+
+ if extent is None:
+ return None, None
+
+ if not isinstance(extent, tuple) or len(extent) != 4:
+ # pylint: disable-next=broad-exception-raised
+ raise Exception(f'Invalid extent {extent}')
+ if srs is None:
+ # pylint: disable-next=broad-exception-raised
+ raise Exception('Configured extent but no SRS')
+
+ logging.debug('Configured extent in %s: %s', srs.GetName(),
+ ', '.join(map(str, extent)))
+
+ ring = ogr.Geometry(ogr.wkbLinearRing)
+ ring.AddPoint_2D(extent[0], extent[1])
+ ring.AddPoint_2D(extent[2], extent[1])
+ ring.AddPoint_2D(extent[2], extent[3])
+ ring.AddPoint_2D(extent[0], extent[3])
+ ring.AddPoint_2D(extent[0], extent[1])
+
+ polygon = ogr.Geometry(ogr.wkbPolygon)
+ polygon.AddGeometry(ring)
+
+ # we expressed extent as minX, minY, maxX, maxY (easting/northing
+ # ordered, i.e., in traditional GIS order)
+ srs2 = srs.Clone()
+ srs2.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER)
+ polygon.AssignSpatialReference(srs2)
+ if not srs2.IsSame(srs):
+ polygon.TransformTo(srs)
+
+ # densify the rectangle to avoid issues when reprojecting to the
+ # source layer SRS, cf. apps/ogr2ogr_lib.cpp:ApplySpatialFilter()
+ polygon_dense = polygon.Clone()
+ segment_distance_metre = 10 * 1000
+ if srs.IsGeographic():
+ dfMaxLength = segment_distance_metre / math.radians(srs.GetSemiMajor())
+ polygon_dense.Segmentize(dfMaxLength)
+ elif srs.IsProjected():
+ dfMaxLength = segment_distance_metre / srs.GetLinearUnits()
+ polygon_dense.Segmentize(dfMaxLength)
+
+ return polygon_dense, polygon
+
+def formatTZFlag(tzFlag : int) -> str:
+ """Pretty-print timezone flag, cf. ogr/ogrutils.cpp:OGRGetISO8601DateTime()"""
+ if tzFlag is None:
+ raise RuntimeError('printTimeZone(None)')
+ if tzFlag == ogr.TZFLAG_UNKNOWN:
+ return 'none'
+ if tzFlag == ogr.TZFLAG_LOCALTIME:
+ return 'local'
+ if tzFlag == ogr.TZFLAG_UTC:
+ return 'UTC'
+
+ tzOffset = abs(tzFlag - 100) * 15
+ tzHour = int(tzOffset / 60)
+ tzMinute = int(tzOffset % 60)
+ tzSign = '+' if tzFlag > 100 else '-'
+ return f'{tzSign}{tzHour:02}{tzMinute:02}'
+
+def fromGeomTypeName(name : str) -> int:
+ """Parse a Geometry type name, cf. ogr/ogrgeometry.cpp:OGRFromOGCGeomType()"""
+ name = name.upper()
+
+ isMeasured = False
+ if name.endswith('M'):
+ isMeasured = True
+ name = name.removesuffix('M')
+
+ convertTo3D = False
+ if name.endswith('Z'):
+ convertTo3D = True
+ name = name.removesuffix('Z')
+
+ if name == 'POINT':
+ eGType = ogr.wkbPoint
+ elif name == 'LINESTRING':
+ eGType = ogr.wkbLineString
+ elif name == 'POLYGON':
+ eGType = ogr.wkbPolygon
+ elif name == 'MULTIPOINT':
+ eGType = ogr.wkbMultiPoint
+ elif name == 'MULTILINESTRING':
+ eGType = ogr.wkbMultiLineString
+ elif name == 'MULTIPOLYGON':
+ eGType = ogr.wkbMultiPolygon
+ elif name == 'GEOMETRYCOLLECTION':
+ eGType = ogr.wkbGeometryCollection
+ elif name == 'CIRCULARSTRING':
+ eGType = ogr.wkbCircularString
+ elif name == 'COMPOUNDCURVE':
+ eGType = ogr.wkbCompoundCurve
+ elif name == 'CURVEPOLYGON':
+ eGType = ogr.wkbCurvePolygon
+ elif name == 'MULTICURVE':
+ eGType = ogr.wkbMultiCurve
+ elif name == 'MULTISURFACE':
+ eGType = ogr.wkbMultiSurface
+ elif name == 'TRIANGLE':
+ eGType = ogr.wkbTriangle
+ elif name == 'POLYHEDRALSURFACE':
+ eGType = ogr.wkbPolyhedralSurface
+ elif name == 'TIN':
+ eGType = ogr.wkbTIN
+ elif name == 'CURVE':
+ eGType = ogr.wkbCurve
+ elif name == 'SURFACE':
+ eGType = ogr.wkbSurface
+ else:
+ eGType = ogr.wkbUnknown
+
+ if convertTo3D:
+ eGType = ogr.GT_SetZ(eGType)
+
+ if isMeasured:
+ eGType = ogr.GT_SetM(eGType)
+
+ return eGType
+
+def parseGeomType(name : str|None) -> int:
+ """Parse geometry type, cf. ogr2ogr_lib.cpp"""
+ if name is None:
+ return ogr.wkbUnknown
+ name2 = name.upper()
+
+ is3D = False
+ if name2.endswith('25D'):
+ name2 = name2[:-3] # alias
+ is3D = True
+ elif name2.endswith('Z'):
+ name2 = name2[:-1]
+ is3D = True
+
+ if name2 == 'NONE':
+ eGType = ogr.wkbNone
+ elif name2 == 'GEOMETRY':
+ eGType = ogr.wkbUnknown
+ else:
+ eGType = fromGeomTypeName(name2)
+ if eGType == ogr.wkbUnknown:
+ raise BadConfiguration(f'Unknown geometry type "{name}"')
+
+ if eGType != ogr.wkbNone and is3D:
+ eGType = ogr.GT_SetZ(eGType)
+
+ return eGType
+
+
+def parseFieldType(name : str) -> int:
+ """Parse field type, cf. ogr/ogr_core.h's enum OGRFieldType"""
+ # pylint: disable=too-many-return-statements
+ if name is None:
+ raise RuntimeError('parseFieldType(None)')
+
+ name2 = name.lower()
+ if name2 == 'integer':
+ # simple 32bit integer
+ return ogr.OFTInteger
+ if name2 == 'integerlist':
+ # List of 32bit integers
+ return ogr.OFTIntegerList
+ if name2 == 'real':
+ # Double Precision floating point
+ return ogr.OFTReal
+ if name2 == 'reallist':
+ # List of doubles
+ return ogr.OFTRealList
+ if name2 == 'string':
+ # String of ASCII chars
+ return ogr.OFTString
+ if name2 == 'stringlist':
+ # Array of strings
+ return ogr.OFTStringList
+ if name2 == 'binary':
+ # Raw Binary data
+ return ogr.OFTBinary
+ if name2 == 'date':
+ # Date
+ return ogr.OFTDate
+ if name2 == 'time':
+ # Time
+ return ogr.OFTTime
+ if name2 == 'datetime':
+ # Date and Time
+ return ogr.OFTDateTime
+ if name2 == 'integer64':
+ # Single 64bit integer
+ return ogr.OFTInteger64
+ if name2 == 'integer64list':
+ # List of 64bit integers
+ return ogr.OFTInteger64List
+ raise BadConfiguration(f'Unknown field type "{name}"')
+
+def parseSubFieldType(name : str) -> int:
+ """Parse subfield type, cf. ogr/ogr_core.h's enum OGRFieldSubType"""
+ if name is None:
+ raise RuntimeError('parseSubFieldType(None)')
+ name2 = name.lower()
+ if name2 == 'none':
+ # No subtype. This is the default value.
+ return ogr.OFSTNone
+ if name2 == 'bool':
+ # Boolean integer. Only valid for OFTInteger and OFTIntegerList.
+ return ogr.OFSTBoolean
+ if name2 == 'int16':
+ # Signed 16-bit integer. Only valid for OFTInteger and OFTIntegerList.
+ return ogr.OFSTInt16
+ if name2 == 'float32':
+ # Single precision (32 bit) floating point. Only valid for OFTReal and OFTRealList.
+ return ogr.OFSTFloat32
+ if name2 == 'json':
+ # JSON content. Only valid for OFTString.
+ return ogr.OFSTJSON
+ if name2 == 'uuid':
+ # UUID string representation. Only valid for OFTString.
+ return ogr.OFSTUUID
+ raise BadConfiguration(f'Unknown field subtype "{name}"')
+
+TZ_RE = re.compile(r'(?:UTC\b)?([\+\-]?)([0-9][0-9]):?([0-9][0-9])', flags=re.IGNORECASE)
+def parseTimeZone(tz : str) -> int:
+ """Parse timezone."""
+ if tz is None:
+ raise RuntimeError('parseTimeZone(None)')
+ tz2 = tz.lower()
+ if tz2 == 'none':
+ return ogr.TZFLAG_UNKNOWN
+ if tz2 == 'local':
+ return ogr.TZFLAG_LOCALTIME
+ if tz2 in ('utc', 'gmt'):
+ return ogr.TZFLAG_UTC
+
+ m = TZ_RE.fullmatch(tz)
+ if m is None:
+ raise BadConfiguration(f'Invalid timezone "{tz}"')
+ tzSign = m.group(1)
+ tzHour = int(m.group(2))
+ tzMinute = int(m.group(3))
+ if tzHour > 14 or tzMinute >= 60 or tzMinute % 15 != 0:
+ raise BadConfiguration(f'Invalid timezone "{tz}"')
+ tzFlag = tzHour*4 + int(tzMinute/15)
+ if tzSign == '-':
+ tzFlag = 100 - tzFlag
+ else:
+ tzFlag += 100
+ return tzFlag