#!/usr/bin/python3 #---------------------------------------------------------------------- # List and filter observations from Artdatabanken # Copyright © 2023 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- import argparse from datetime import datetime import time import locale from pathlib import Path from osgeo import gdal, ogr, osr gdal.UseExceptions() target_srs = osr.SpatialReference() target_srs.ImportFromEPSG(3006) target_srs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER) locale.setlocale(locale.LC_TIME, 'sv_SE.UTF-8') programName = 'list-observations' def ePath(v): return Path(v).expanduser() class geometryAction(argparse.Action): def __init__(self, option_strings, dest, nargs=None, **kwargs): super().__init__(option_strings, dest, **kwargs) def __call__(self, parser, namespace, value, option_string=None): items = getattr(namespace, self.dest, None) if items is None: items = [value] else: items = items.copy() items.append(value) setattr(namespace, self.dest, items) class redlistedAction(argparse.Action): def __init__(self, option_strings, dest, nargs=None, **kwargs): super().__init__(option_strings, dest, nargs=nargs, **kwargs) def __call__(self, parser, namespace, values, option_string=None): items = getattr(namespace, self.dest, None) if values is None: items = 'TaxonIsRedlisted' else: values = values.split(',') for v in values: if v not in ['EX', 'EW', 'RE', 'CR', 'EN', 'VU', 'NT', 'LC', 'DD', 'NE', 'NA']: raise Exception(f'{v}: Invalid redlist category') if items is None: items = values else: items = items.copy() items.extend(values) setattr(namespace, self.dest, items) class taxonAction(argparse.Action): def __init__(self, option_strings, dest, nargs=None, **kwargs): super().__init__(option_strings, dest, nargs=nargs, **kwargs) def __call__(self, parser, namespace, values, option_string=None): items = getattr(namespace, self.dest, None) values = values.lower().split(',') if items is None: items = values else: items = items.copy() items.extend(values) setattr(namespace, self.dest, items) parser = argparse.ArgumentParser( description='List and filter observations from Artdatabanken.', prog = programName, ) parser.add_argument('--geometry', type=ePath, metavar='FILE', action=geometryAction, help='Geometry file of interest') parser.add_argument('--max-uncertainty', metavar='N', type=int, default=250, help='Maximum coordinate uncertainty in meters (default: %(default)s)') parser.add_argument('--max-distance', metavar='N', type=int, help='Maximum distance in meters from target geometry') parser.add_argument('--since', metavar='YYYY-MM-DD', help='Start date for observations in ISO 8601 format') parser.add_argument('--until', metavar='YYYY-MM-DD', help='End date for observations in ISO 8601 format') parser.add_argument('--protected', action='store_true', help='Only consider taxons that are protected by law') parser.add_argument('--redlisted', nargs='?', metavar='[NT,…]', action=redlistedAction, help='Only consider redlisted taxons') parser.add_argument('--taxon', metavar='NAME', action=taxonAction, help='Only consider these taxons') parser.add_argument('observations', type=ePath, metavar='OBSERVATIONS_FILE', help='Observations file') args = parser.parse_args() for k in ['since', 'until']: v = getattr(args, k, None) if v is not None: d = datetime.fromisoformat(v) setattr(args, k, d) geometry = None if args.geometry is not None: for path in args.geometry: path = path.as_posix() ds = ogr.Open(path, update=0) layer = None for i in range(ds.GetLayerCount()): lyr = ds.GetLayerByIndex(i) if lyr.GetGeomType() in [ogr.wkbPolygon, ogr.wkbMultiPolygon, ogr.wkbPolygonM, ogr.wkbMultiPolygonM, ogr.wkbPolygonZM, ogr.wkbMultiPolygonZM, ogr.wkbPolygon25D, ogr.wkbMultiPolygon25D]: layer = lyr break if layer is None: raise Exception(f'{path}: does not have any layer of geometry type Polygon') source_srs = layer.GetSpatialRef() transform_srs = osr.CoordinateTransformation(source_srs, target_srs) feature = layer.GetNextFeature() while feature is not None: geom = feature.GetGeometryRef() geom = geom.Clone() geom.FlattenTo2D() geom.Transform(transform_srs) if geometry is None: geometry = ogr.Geometry(ogr.wkbMultiPolygon) geometryType = geom.GetGeometryType() if geometryType == ogr.wkbPolygon: geometry.AddGeometry(geom) elif geometryType == ogr.wkbMultiPolygon: for i in range(geom.GetGeometryCount()): geom2 = geom.GetGeometryRef(i) geometry.AddGeometry(geom2) else: raise Exception(f'Unsuported geometry {geometryType}') feature = layer.GetNextFeature() layer = None ds = None geometry = geometry.UnionCascaded() path = args.observations.as_posix() ds = ogr.Open(path, update=0) layer = None for i in range(ds.GetLayerCount()): lyr = ds.GetLayerByIndex(i) if lyr.GetGeomType() in [ogr.wkbPoint, ogr.wkbPointM, ogr.wkbMultiPointM, ogr.wkbPointZM, ogr.wkbMultiPointZM, ogr.wkbPoint25D]: layer = lyr break if layer is None: raise Exception(f'{path}: does not have any layer of geometry type Point') source_srs = layer.GetSpatialRef() transform_srs = osr.CoordinateTransformation(source_srs, target_srs) def getField(feat, name): idx = feature.GetFieldIndex(name) if idx is not None and idx >= 0: return feature.GetField(idx) return None def getFieldAsDateTime(feat, name): idx = feature.GetFieldIndex(name) if idx is not None and idx >= 0: v = feature.GetFieldAsDateTime(idx) t = (v[0], v[1], v[2], v[3], v[4], int(v[5]), v[6], -1, -1) return datetime.fromtimestamp(time.mktime(t)) return None sum_count = 0 summary = {} feature = layer.GetNextFeature() while feature is not None: v = feature.GetField('IsPositiveObservation') if v is not None and not v: feature = layer.GetNextFeature() continue v = feature.GetField('IsNaturalOccurrence') if v is not None and not v: feature = layer.GetNextFeature() continue uncertainty = feature.GetField('CoordinateUncertaintyInMeters') if uncertainty is not None and uncertainty > args.max_uncertainty: feature = layer.GetNextFeature() continue vernacularName = getField(feature, 'VernacularName') if vernacularName is None: raise Exception('Missing name') if args.taxon is not None and vernacularName.lower() not in args.taxon: feature = layer.GetNextFeature() continue protectedByLaw = feature.GetField('ProtectedByLaw') if args.protected and (protectedByLaw is None or not protectedByLaw): feature = layer.GetNextFeature() continue taxonIsRedlisted = feature.GetField('TaxonIsRedlisted') redlistCategory = getField(feature, 'RedlistCategory') if args.redlisted is not None: if args.redlisted == 'TaxonIsRedlisted': if taxonIsRedlisted is None or not taxonIsRedlisted: feature = layer.GetNextFeature() continue elif type(args.redlisted) == list: if redlistCategory is None or redlistCategory not in args.redlisted: feature = layer.GetNextFeature() continue startDate = getFieldAsDateTime(feature, 'StartDate') if args.until is not None and (startDate is None or args.until < startDate): feature = layer.GetNextFeature() continue endDate = getFieldAsDateTime(feature, 'EndDate') if args.since is not None and (endDate is None or endDate < args.since): feature = layer.GetNextFeature() continue geom = feature.GetGeometryRef() geom = geom.Clone() geom.FlattenTo2D() geom.Transform(transform_srs) distance = None if geometry is not None and not geometry.Contains(geom): distance = geom.Distance(geometry) if distance == -1: raise Exception('distance') max_distance = 0 if args.max_distance is None else args.max_distance if uncertainty is not None: max_distance += uncertainty if max_distance < distance: feature = layer.GetNextFeature() continue if sum_count > 0: print('') print(f'Artnamn: {vernacularName.title()}') name = vernacularName if taxonIsRedlisted is not None and taxonIsRedlisted and redlistCategory is not None: name += f' ({redlistCategory})' n = summary.get(name, 0) summary[name] = n + 1 v = getField(feature, 'OrganismQuantityInt') if v is None: v = 'Noterad' else: v = str(v) unit = getField(feature, 'OrganismQuantityUnit') if unit is not None and unit != '': v += ' ' + unit print(f'Antal & enhet: {v}') v = getField(feature, 'Sex') if v is not None: print(f'Kön: {v}') v = getField(feature, 'LifeStage') if v is not None: print(f'Ålder/stadium: {v}') v = getField(feature, 'Activity') if v is not None: print(f'Aktivitet: {v}') v = getField(feature, 'DiscoveryMethod') if v is not None: print(f'Metod: {v}') v = getField(feature, 'ReportedBy') if ',' in v: print(f'Rapportörer: {v}') else: print(f'Rapportör: {v}') v = getField(feature, 'RecordedBy') if ',' in v: print(f'Observatörer: {v}') else: print(f'Observatör: {v}') print(f'Startdatum: {startDate.strftime("%-d %B %Y %H:%M")}') print(f'Slutdatum: {endDate.strftime("%-d %B %Y %H:%M")}') v = getField(feature, 'Projects') if v is not None: print(f'Projects: {v}') v = getField(feature, 'OccurrenceRemarks') if v is not None: print(f'Kommentar: {v}') v = getField(feature, 'DatasetName') if v is not None: print(f'Källa: {v}') v = getField(feature, 'Url') if v is not None: print(f'URL: {v}') if geom.GetGeometryType() != ogr.wkbPoint: raise Exception('Not a point') x, y = geom.GetPoint_2D(i) v = getField(feature, 'Locality') if v is not None: print(f'Lokalnamn: {v}') coords = f'Koordinater: N{y:.0f}, Ö{x:.0f} (±{uncertainty:.0f}m) {target_srs.GetName()}' if geometry is not None: if distance is None: dist = 'i' else: dist = f'{distance:.0f}m från' coords += f'; {dist} målgeometrin' print(coords) if redlistCategory is None: v = '—' elif redlistCategory == 'EX': v = 'Utdöd (EX)' elif redlistCategory == 'EW': v = 'Utdöd i vilt tillstånd (EW)' elif redlistCategory == 'RE': v = 'Nationellt utdöd (RE)' elif redlistCategory == 'CR': v = 'Akut hotad (CR)' elif redlistCategory == 'EN': v = 'Starkt hotad (EN)' elif redlistCategory == 'VU': v = 'Sårbar (VU)' elif redlistCategory == 'NT': v = 'Nära hotad (NT)' elif redlistCategory == 'LC': v = 'Livskraftig (LC)' elif redlistCategory == 'DD': v = 'Kunskapsbrist (DD)' elif redlistCategory == 'NE': v = 'Ej bedömd (NE)' elif redlistCategory == 'NA': v = 'Ej tillämplig (NA)' else: v = redlistCategory print(f'Rödlistning: {v}') v = [] if protectedByLaw is not None and protectedByLaw: v.append('nationellt fridlyst') v2 = getField(feature, 'TaxonIsInvasiveInSweden') if v2 is not None and v2: v.append('främmande i Sverige') v2 = getField(feature, 'TaxonIsInvasiveEuRegulation') if v2 is not None and v2: v.append('invasiv enligt EU-förordning 1143/2014') v2 = getField(feature, 'Natura2000HabitatsDirective') if v2 is not None and v2: v2 = [] v3 = getField(feature, 'Natura2000HabitatsDirectiveArticle2') if v3 is not None and v3: v3 = getField(feature, 'Natura2000HabitatsDirectiveArticle2PrioritySpecie') if v3 is not None and v3: v2.append('II (prioriterad art)') else: v2.append('II') v3 = getField(feature, 'Natura2000HabitatsDirectiveArticle4') if v3 is not None and v3: v2.append('IV') v3 = getField(feature, 'Natura2000HabitatsDirectiveArticle5') if v3 is not None and v3: v2.append('V') if len(v2) == 0: v.append('habitatdirektivet') elif len(v2) == 1: v.append('habitatdirektivets bilaga ' + v2[0]) else: v.append('habitatdirektivets bilagor ' + ', '.join(v2[0:-1]) + ' & ' + v2[-1]) v2 = getField(feature, 'TaxonIsSignalSpecie') if v2 is not None and v2: v.append('signalart enligt Skogsstyrelsen') v2 = getField(feature, 'BirdDirective') if v2 is not None and v2: v.append('fågeldirektivet') if len(v) > 0: v[0] = v[0][0].upper() + v[0][1:] v = '; '.join(v) else: v = '—' print(f'Naturvård: {v}') sum_count += 1 feature = layer.GetNextFeature() if sum_count == 0: print('No matching observations found!') exit(0) max_count = max(summary.values()) width = len(str(max_count)) print('\n\nSummary\n-------') for name, n in sorted(summary.items(), key=lambda x: (-x[1], x[0])): print('{0:{width}d}× {1}'.format(n, name, width=width)) print('-' * (width+1)) print(f'{sum_count} matching observations')