From 0070da1fc157bf8bf9f2703bea5505843c39bde6 Mon Sep 17 00:00:00 2001 From: Guilhem Moulin Date: Mon, 23 Oct 2023 21:55:28 +0200 Subject: Add new script to list and filter observations from Artdatabanken. Usage example: ./list-observations --redlisted --max-distance=250 --geometry=/tmp/nygruva.geojson /tmp/nygruva/fynd.gpkg --- list-observations | 432 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 432 insertions(+) create mode 100755 list-observations diff --git a/list-observations b/list-observations new file mode 100755 index 0000000..be8961f --- /dev/null +++ b/list-observations @@ -0,0 +1,432 @@ +#!/usr/bin/python3 + +#---------------------------------------------------------------------- +# List and filter observations from Artdatabanken +# Copyright © 2023 Guilhem Moulin +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#---------------------------------------------------------------------- + +import argparse +from datetime import datetime +import time +import locale +from pathlib import Path +from osgeo import gdal, ogr, osr +gdal.UseExceptions() + +target_srs = osr.SpatialReference() +target_srs.ImportFromEPSG(3006) +target_srs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER) + +locale.setlocale(locale.LC_TIME, 'sv_SE.UTF-8') + +programName = 'list-observations' + +def ePath(v): + return Path(v).expanduser() + + +class geometryAction(argparse.Action): + def __init__(self, option_strings, dest, nargs=None, **kwargs): + super().__init__(option_strings, dest, **kwargs) + def __call__(self, parser, namespace, value, option_string=None): + items = getattr(namespace, self.dest, None) + if items is None: + items = [value] + else: + items = items.copy() + items.append(value) + setattr(namespace, self.dest, items) + +class redlistedAction(argparse.Action): + def __init__(self, option_strings, dest, nargs=None, **kwargs): + super().__init__(option_strings, dest, nargs=nargs, **kwargs) + def __call__(self, parser, namespace, values, option_string=None): + items = getattr(namespace, self.dest, None) + if values is None: + items = 'TaxonIsRedlisted' + else: + values = values.split(',') + for v in values: + if v not in ['EX', 'EW', 'RE', 'CR', 'EN', 'VU', 'NT', 'LC', 'DD', 'NE', 'NA']: + raise Exception(f'{v}: Invalid redlist category') + if items is None: + items = values + else: + items = items.copy() + items.extend(values) + setattr(namespace, self.dest, items) + +class taxonAction(argparse.Action): + def __init__(self, option_strings, dest, nargs=None, **kwargs): + super().__init__(option_strings, dest, nargs=nargs, **kwargs) + def __call__(self, parser, namespace, values, option_string=None): + items = getattr(namespace, self.dest, None) + values = values.lower().split(',') + if items is None: + items = values + else: + items = items.copy() + items.extend(values) + setattr(namespace, self.dest, items) + +parser = argparse.ArgumentParser( + description='List and filter observations from Artdatabanken.', + prog = programName, +) +parser.add_argument('--geometry', type=ePath, metavar='FILE', action=geometryAction, + help='Geometry file of interest') +parser.add_argument('--max-uncertainty', metavar='N', type=int, default=250, + help='Maximum coordinate uncertainty in meters (default: %(default)s)') +parser.add_argument('--max-distance', metavar='N', type=int, + help='Maximum distance in meters from target geometry') +parser.add_argument('--since', metavar='YYYY-MM-DD', + help='Start date for observations in ISO 8601 format') +parser.add_argument('--until', metavar='YYYY-MM-DD', + help='End date for observations in ISO 8601 format') +parser.add_argument('--protected', action='store_true', + help='Only consider taxons that are protected by law') +parser.add_argument('--redlisted', nargs='?', metavar='[NT,…]', action=redlistedAction, + help='Only consider redlisted taxons') +parser.add_argument('--taxon', metavar='NAME', action=taxonAction, + help='Only consider these taxons') +parser.add_argument('observations', type=ePath, metavar='OBSERVATIONS_FILE', + help='Observations file') + +args = parser.parse_args() + +for k in ['since', 'until']: + v = getattr(args, k, None) + if v is not None: + d = datetime.fromisoformat(v) + setattr(args, k, d) + +geometry = None +for path in args.geometry: + path = path.as_posix() + ds = ogr.Open(path, update=0) + + layer = None + for i in range(ds.GetLayerCount()): + lyr = ds.GetLayerByIndex(i) + if lyr.GetGeomType() in [ogr.wkbPolygon, + ogr.wkbMultiPolygon, + ogr.wkbPolygonM, + ogr.wkbMultiPolygonM, + ogr.wkbPolygonZM, + ogr.wkbMultiPolygonZM, + ogr.wkbPolygon25D, + ogr.wkbMultiPolygon25D]: + layer = lyr + break + if layer is None: + raise Exception(f'{path}: does not have any layer of geometry type Polygon') + + source_srs = layer.GetSpatialRef() + transform_srs = osr.CoordinateTransformation(source_srs, target_srs) + + feature = layer.GetNextFeature() + while feature is not None: + geom = feature.GetGeometryRef() + geom = geom.Clone() + geom.FlattenTo2D() + geom.Transform(transform_srs) + + if geometry is None: + geometry = ogr.Geometry(ogr.wkbMultiPolygon) + + geometryType = geom.GetGeometryType() + if geometryType == ogr.wkbPolygon: + geometry.AddGeometry(geom) + elif geometryType == ogr.wkbMultiPolygon: + for i in range(geom.GetGeometryCount()): + geom2 = geom.GetGeometryRef(i) + geometry.AddGeometry(geom2) + else: + raise Exception(f'Unsuported geometry {geometryType}') + feature = layer.GetNextFeature() + layer = None + ds = None + +if geometry is not None: + geometry = geometry.UnionCascaded() + + +path = args.observations.as_posix() +ds = ogr.Open(path, update=0) + +layer = None +for i in range(ds.GetLayerCount()): + lyr = ds.GetLayerByIndex(i) + if lyr.GetGeomType() in [ogr.wkbPoint, ogr.wkbPointM, ogr.wkbMultiPointM, ogr.wkbPointZM, ogr.wkbMultiPointZM, ogr.wkbPoint25D]: + layer = lyr + break +if layer is None: + raise Exception(f'{path}: does not have any layer of geometry type Point') + +source_srs = layer.GetSpatialRef() +transform_srs = osr.CoordinateTransformation(source_srs, target_srs) + +def getField(feat, name): + idx = feature.GetFieldIndex(name) + if idx is not None and idx >= 0: + return feature.GetField(idx) + return None +def getFieldAsDateTime(feat, name): + idx = feature.GetFieldIndex(name) + if idx is not None and idx >= 0: + v = feature.GetFieldAsDateTime(idx) + t = (v[0], v[1], v[2], v[3], v[4], int(v[5]), v[6], -1, -1) + return datetime.fromtimestamp(time.mktime(t)) + return None + +sum_count = 0 +summary = {} +feature = layer.GetNextFeature() +while feature is not None: + v = feature.GetField('IsPositiveObservation') + if v is not None and not v: + feature = layer.GetNextFeature() + continue + v = feature.GetField('IsNaturalOccurrence') + if v is not None and not v: + feature = layer.GetNextFeature() + continue + + uncertainty = feature.GetField('CoordinateUncertaintyInMeters') + if uncertainty is not None and uncertainty > args.max_uncertainty: + feature = layer.GetNextFeature() + continue + + vernacularName = getField(feature, 'VernacularName') + if vernacularName is None: + raise Exception('Missing name') + if args.taxon is not None and vernacularName.lower() not in args.taxon: + feature = layer.GetNextFeature() + continue + + protectedByLaw = feature.GetField('ProtectedByLaw') + if args.protected and (protectedByLaw is None or not protectedByLaw): + feature = layer.GetNextFeature() + continue + + taxonIsRedlisted = feature.GetField('TaxonIsRedlisted') + redlistCategory = getField(feature, 'RedlistCategory') + if args.redlisted is not None: + if args.redlisted == 'TaxonIsRedlisted': + if taxonIsRedlisted is None or not taxonIsRedlisted: + feature = layer.GetNextFeature() + continue + elif type(args.redlisted) == list: + if redlistCategory is None or redlistCategory not in args.redlisted: + feature = layer.GetNextFeature() + continue + + startDate = getFieldAsDateTime(feature, 'StartDate') + if args.until is not None and (startDate is None or args.until < startDate): + feature = layer.GetNextFeature() + continue + + endDate = getFieldAsDateTime(feature, 'EndDate') + if args.since is not None and (endDate is None or endDate < args.since): + feature = layer.GetNextFeature() + continue + + geom = feature.GetGeometryRef() + geom = geom.Clone() + geom.FlattenTo2D() + geom.Transform(transform_srs) + + distance = None + if geometry is not None and not geometry.Contains(geom): + distance = geom.Distance(geometry) + if distance == -1: + raise Exception('distance') + max_distance = 0 if args.max_distance is None else args.max_distance + if uncertainty is not None: + max_distance += uncertainty + if max_distance < distance: + feature = layer.GetNextFeature() + continue + + if sum_count > 0: + print('') + + print(f'Artnamn: {vernacularName.title()}') + + name = vernacularName + if taxonIsRedlisted is not None and taxonIsRedlisted and redlistCategory is not None: + name += f' ({redlistCategory})' + n = summary.get(name, 0) + summary[name] = n + 1 + + v = getField(feature, 'OrganismQuantityInt') + if v is None: + v = 'Noterad' + else: + v = str(v) + unit = getField(feature, 'OrganismQuantityUnit') + if unit is not None and unit != '': + v += ' ' + unit + print(f'Antal & enhet: {v}') + + v = getField(feature, 'Sex') + if v is not None: + print(f'Kön: {v}') + + v = getField(feature, 'LifeStage') + if v is not None: + print(f'Ålder/stadium: {v}') + + v = getField(feature, 'Activity') + if v is not None: + print(f'Aktivitet: {v}') + + v = getField(feature, 'DiscoveryMethod') + if v is not None: + print(f'Metod: {v}') + + v = getField(feature, 'ReportedBy') + if ',' in v: + print(f'Rapportörer: {v}') + else: + print(f'Rapportör: {v}') + v = getField(feature, 'RecordedBy') + if ',' in v: + print(f'Observatörer: {v}') + else: + print(f'Observatör: {v}') + + print(f'Startdatum: {startDate.strftime("%-d %B %Y %H:%M")}') + print(f'Slutdatum: {endDate.strftime("%-d %B %Y %H:%M")}') + + v = getField(feature, 'Projects') + if v is not None: + print(f'Projects: {v}') + + v = getField(feature, 'OccurrenceRemarks') + if v is not None: + print(f'Kommentar: {v}') + + v = getField(feature, 'DatasetName') + if v is not None: + print(f'Källa: {v}') + v = getField(feature, 'Url') + if v is not None: + print(f'URL: {v}') + + if geom.GetGeometryType() != ogr.wkbPoint: + raise Exception('Not a point') + x, y = geom.GetPoint_2D(i) + v = getField(feature, 'Locality') + if v is not None: + print(f'Lokalnamn: {v}') + coords = f'Koordinater: N{y:.0f}, Ö{x:.0f} (±{uncertainty:.0f}m) {target_srs.GetName()}' + + if geometry is not None: + if distance is None: + dist = 'i' + else: + dist = f'{distance:.0f}m från' + coords += f'; {dist} målgeometrin' + print(coords) + + if redlistCategory is None: + v = '—' + elif redlistCategory == 'EX': + v = 'Utdöd (EX)' + elif redlistCategory == 'EW': + v = 'Utdöd i vilt tillstånd (EW)' + elif redlistCategory == 'RE': + v = 'Nationellt utdöd (RE)' + elif redlistCategory == 'CR': + v = 'Akut hotad (CR)' + elif redlistCategory == 'EN': + v = 'Starkt hotad (EN)' + elif redlistCategory == 'VU': + v = 'Sårbar (VU)' + elif redlistCategory == 'NT': + v = 'Nära hotad (NT)' + elif redlistCategory == 'LC': + v = 'Livskraftig (LC)' + elif redlistCategory == 'DD': + v = 'Kunskapsbrist (DD)' + elif redlistCategory == 'NE': + v = 'Ej bedömd (NE)' + elif redlistCategory == 'NA': + v = 'Ej tillämplig (NA)' + else: + v = redlistCategory + print(f'Rödlistning: {v}') + + v = [] + if protectedByLaw is not None and protectedByLaw: + v.append('nationellt fridlyst') + v2 = getField(feature, 'TaxonIsInvasiveInSweden') + if v2 is not None and v2: + v.append('främmande i Sverige') + v2 = getField(feature, 'TaxonIsInvasiveEuRegulation') + if v2 is not None and v2: + v.append('invasiv enligt EU-förordning 1143/2014') + v2 = getField(feature, 'Natura2000HabitatsDirective') + if v2 is not None and v2: + v2 = [] + v3 = getField(feature, 'Natura2000HabitatsDirectiveArticle2') + if v3 is not None and v3: + v3 = getField(feature, 'Natura2000HabitatsDirectiveArticle2PrioritySpecie') + if v3 is not None and v3: + v2.append('II (prioriterad art)') + else: + v2.append('II') + v3 = getField(feature, 'Natura2000HabitatsDirectiveArticle4') + if v3 is not None and v3: + v2.append('IV') + v3 = getField(feature, 'Natura2000HabitatsDirectiveArticle5') + if v3 is not None and v3: + v2.append('V') + if len(v2) == 0: + v.append('habitatdirektivet') + elif len(v2) == 1: + v.append('habitatdirektivets bilaga ' + v2[0]) + else: + v.append('habitatdirektivets bilagor ' + ', '.join(v2[0:-1]) + ' & ' + v2[-1]) + v2 = getField(feature, 'TaxonIsSignalSpecie') + if v2 is not None and v2: + v.append('signalart enligt Skogsstyrelsen') + v2 = getField(feature, 'BirdDirective') + if v2 is not None and v2: + v.append('fågeldirektivet') + if len(v) > 0: + v[0] = v[0][0].upper() + v[0][1:] + v = '; '.join(v) + else: + v = '—' + print(f'Naturvård: {v}') + + sum_count += 1 + feature = layer.GetNextFeature() + +if sum_count == 0: + print('No matching observations found!') + exit(0) + +max_count = max(summary.values()) +width = len(str(max_count)) + +print('\n\nSummary\n-------') +for name, n in sorted(summary.items(), key=lambda x: (-x[1], x[0])): + print('{0:{width}d}× {1}'.format(n, name, width=width)) +print('-' * (width+1)) +print(f'{sum_count} matching observations') -- cgit v1.2.3