#!/usr/bin/python3 #---------------------------------------------------------------------- # List and filter observations from Artdatabanken # Copyright © 2023 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- import argparse from datetime import datetime import time import locale import re from pathlib import Path from osgeo import gdal, ogr, osr gdal.UseExceptions() target_srs = osr.SpatialReference() target_srs.ImportFromEPSG(3006) target_srs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER) locale.setlocale(locale.LC_TIME, 'sv_SE.UTF-8') programName = 'list-observations' def ePath(v): return Path(v).expanduser() class geometryAction(argparse.Action): def __init__(self, option_strings, dest, nargs=None, **kwargs): super().__init__(option_strings, dest, **kwargs) def __call__(self, parser, namespace, value, option_string=None): items = getattr(namespace, self.dest, None) if items is None: items = [value] else: items = items.copy() items.append(value) setattr(namespace, self.dest, items) class redlistedAction(argparse.Action): def __init__(self, option_strings, dest, nargs=None, **kwargs): super().__init__(option_strings, dest, nargs=nargs, **kwargs) def __call__(self, parser, namespace, values, option_string=None): items = getattr(namespace, self.dest, None) if values is None: items = 'TaxonIsRedlisted' else: values = values.split(',') for v in values: if v not in ['EX', 'EW', 'RE', 'CR', 'EN', 'VU', 'NT', 'LC', 'DD', 'NE', 'NA']: raise Exception(f'{v}: Invalid redlist category') if items is None: items = values else: items = items.copy() items.extend(values) setattr(namespace, self.dest, items) class splitAndExtendAction(argparse.Action): def __init__(self, option_strings, dest, nargs=None, **kwargs): super().__init__(option_strings, dest, nargs=nargs, **kwargs) def __call__(self, parser, namespace, values, option_string=None): items = getattr(namespace, self.dest, None) values = values.lower().split(',') if items is None: items = values else: items = items.copy() items.extend(values) setattr(namespace, self.dest, items) parser = argparse.ArgumentParser( description='List and filter observations from Artdatabanken.', prog = programName, usage='''%(prog)s [--geometry=FILE] [--max-distance=N] [--redlisted[=NT,VU,…]] OBSERVATIONS_FILE''' ) parser.add_argument('--geometry', type=ePath, metavar='FILE', action=geometryAction, help='Geometry file of interest') parser.add_argument('--max-uncertainty', metavar='N', type=int, default=250, help='Maximum coordinate uncertainty in meters (default: %(default)s)') parser.add_argument('--max-distance', metavar='N', type=int, help='Maximum distance in meters from target geometry') parser.add_argument('--since', metavar='YYYY-MM-DD', help='Start date for observations in ISO 8601 format') parser.add_argument('--until', metavar='YYYY-MM-DD', help='End date for observations in ISO 8601 format') parser.add_argument('--protected', action='store_true', help='Only consider taxons that are protected by law') parser.add_argument('--redlisted', nargs='?', metavar='NT,…', action=redlistedAction, help='Only consider redlisted taxons') parser.add_argument('--taxon', metavar='NAME', action=splitAndExtendAction, help='Only consider these taxons') parser.add_argument('--reporter', metavar='NAME', action=splitAndExtendAction, help='Only consider observation by these reporters') parser.add_argument('--observer', metavar='NAME', action=splitAndExtendAction, help='Only consider observation by these observers') parser.add_argument('observations', type=ePath, metavar='OBSERVATIONS_FILE', help='Observations file') args = parser.parse_args() for k in ['since', 'until']: v = getattr(args, k, None) if v is not None: d = datetime.fromisoformat(v) setattr(args, k, d) geometry = None if args.geometry is not None: for path in args.geometry: path = path.as_posix() ds = ogr.Open(path, update=0) layer = None for i in range(ds.GetLayerCount()): lyr = ds.GetLayerByIndex(i) if lyr.GetGeomType() in [ogr.wkbPolygon, ogr.wkbMultiPolygon, ogr.wkbPolygonM, ogr.wkbMultiPolygonM, ogr.wkbPolygonZM, ogr.wkbMultiPolygonZM, ogr.wkbPolygon25D, ogr.wkbMultiPolygon25D]: layer = lyr break if layer is None: raise Exception(f'{path}: does not have any layer of geometry type Polygon') source_srs = layer.GetSpatialRef() transform_srs = osr.CoordinateTransformation(source_srs, target_srs) feature = layer.GetNextFeature() while feature is not None: geom = feature.GetGeometryRef() geom = geom.Clone() geom.FlattenTo2D() geom.Transform(transform_srs) if geometry is None: geometry = ogr.Geometry(ogr.wkbMultiPolygon) geometryType = geom.GetGeometryType() if geometryType == ogr.wkbPolygon: geometry.AddGeometry(geom) elif geometryType == ogr.wkbMultiPolygon: for i in range(geom.GetGeometryCount()): geom2 = geom.GetGeometryRef(i) geometry.AddGeometry(geom2) else: raise Exception(f'Unsuported geometry {geometryType}') feature = layer.GetNextFeature() layer = None ds = None geometry = geometry.UnionCascaded() path = args.observations.as_posix() ds = ogr.Open(path, update=0) layer = None for i in range(ds.GetLayerCount()): lyr = ds.GetLayerByIndex(i) if lyr.GetGeomType() in [ogr.wkbPoint, ogr.wkbPointM, ogr.wkbMultiPointM, ogr.wkbPointZM, ogr.wkbMultiPointZM, ogr.wkbPoint25D]: layer = lyr break if layer is None: raise Exception(f'{path}: does not have any layer of geometry type Point') source_srs = layer.GetSpatialRef() transform_srs = osr.CoordinateTransformation(source_srs, target_srs) layerDefn = layer.GetLayerDefn() fieldCount = layerDefn.GetFieldCount() sum_count = 0 summary = {} listSeparator = re.compile(r',\s*') def printObservation(feature): global sum_count properties = {} for i in range(fieldCount): if not feature.IsFieldSet(i): continue fieldDefn = layerDefn.GetFieldDefn(i) if feature.IsFieldNull(i): val = None elif fieldDefn.type == ogr.OFTInteger: # booleans are there too val = feature.GetFieldAsInteger(i) elif fieldDefn.type == ogr.OFTString: val = feature.GetFieldAsString(i) elif fieldDefn.type == ogr.OFTDateTime: st = feature.GetFieldAsDateTime(i) t = (st[0], st[1], st[2], st[3], st[4], int(st[5]), st[6], -1, -1) val = datetime.fromtimestamp(time.mktime(t)) else: raise Exception(f'{fieldDefn.name}: Unknown field type {fieldDefn.type}') properties[fieldDefn.name] = val v = properties.get('IsPositiveObservation') if v is not None and not v: return v = properties.get('IsNaturalOccurrence') if v is not None and not v: return uncertainty = properties.get('CoordinateUncertaintyInMeters') if uncertainty is not None and uncertainty > args.max_uncertainty: return vernacularName = properties.get('VernacularName') if vernacularName is None: pass #raise Exception('Missing name') if args.taxon is not None: name = vernacularName.lower() if not any([sub in name for sub in args.taxon]): return protectedByLaw = properties.get('ProtectedByLaw') if args.protected and (protectedByLaw is None or not protectedByLaw): return taxonIsRedlisted = properties.get('TaxonIsRedlisted') redlistCategory = properties.get('RedlistCategory') if args.redlisted is not None: if args.redlisted == 'TaxonIsRedlisted': if taxonIsRedlisted is None or not taxonIsRedlisted: return elif type(args.redlisted) == list: if redlistCategory is None or redlistCategory not in args.redlisted: return reportedBy = properties.get('ReportedBy') if args.reporter is not None: found = False for r in listSeparator.split(reportedBy): r = r.lower() if any([sub in r for sub in args.reporter]): found = True break if not found: return recordedBy = properties.get('RecordedBy') if args.observer is not None: found = False for r in listSeparator.split(recordedBy): r = r.lower() if any([sub in r for sub in args.observer]): found = True break if not found: return startDate = properties.get('StartDate') if args.until is not None and (startDate is None or args.until < startDate): return endDate = properties.get('EndDate') if args.since is not None and (endDate is None or endDate < args.since): return geom = feature.GetGeometryRef() geom = geom.Clone() geom.FlattenTo2D() geom.Transform(transform_srs) distance = None if geometry is not None and not geometry.Contains(geom): distance = geom.Distance(geometry) if distance == -1: raise Exception('distance') max_distance = 0 if args.max_distance is None else args.max_distance if uncertainty is not None: max_distance += uncertainty if max_distance < distance: return if sum_count > 0: print('') if vernacularName is not None: print(f'Artnamn: {vernacularName.title()}') name = vernacularName if taxonIsRedlisted is not None and taxonIsRedlisted and redlistCategory is not None: name += f' ({redlistCategory})' n = summary.get(name, 0) summary[name] = n + 1 v = properties.get('OrganismQuantityInt') if v is None: v = 'Noterad' else: v = str(v) unit = properties.get('OrganismQuantityUnit') if unit is not None and unit != '': v += ' ' + unit print(f'Antal & enhet: {v}') v = properties.get('Sex') if v is not None: print(f'Kön: {v}') v = properties.get('LifeStage') if v is not None: print(f'Ålder/stadium: {v}') v = properties.get('Activity') if v is not None: print(f'Aktivitet: {v}') v = properties.get('DiscoveryMethod') if v is not None: print(f'Metod: {v}') if reportedBy is not None and ',' in reportedBy: print(f'Rapportörer: {reportedBy}') else: print(f'Rapportör: {reportedBy}') if recordedBy is not None and ',' in recordedBy: print(f'Observatörer: {recordedBy}') else: print(f'Observatör: {recordedBy}') print(f'Startdatum: {startDate.strftime("%-d %B %Y %H:%M")}') print(f'Slutdatum: {endDate.strftime("%-d %B %Y %H:%M")}') v = properties.get('Projects') if v is not None: print(f'Projects: {v}') v = properties.get('OccurrenceRemarks') if v is not None: print(f'Kommentar: {v}') v = properties.get('DatasetName') if v is not None: print(f'Källa: {v}') v = properties.get('Url') if v is not None: print(f'URL: {v}') if geom.GetGeometryType() != ogr.wkbPoint: raise Exception('Not a point') x, y = geom.GetPoint_2D(0) v = properties.get('Locality') if v is not None: print(f'Lokalnamn: {v}') coords = f'Koordinater: N{y:.0f}, Ö{x:.0f} (±{uncertainty:.0f}m) {target_srs.GetName()}' if geometry is not None: if distance is None: dist = 'i' else: dist = f'{distance:.0f}m från' coords += f'; {dist} målgeometrin' print(coords) if redlistCategory is None: v = '—' elif redlistCategory == 'EX': v = 'Utdöd (EX)' elif redlistCategory == 'EW': v = 'Utdöd i vilt tillstånd (EW)' elif redlistCategory == 'RE': v = 'Nationellt utdöd (RE)' elif redlistCategory == 'CR': v = 'Akut hotad (CR)' elif redlistCategory == 'EN': v = 'Starkt hotad (EN)' elif redlistCategory == 'VU': v = 'Sårbar (VU)' elif redlistCategory == 'NT': v = 'Nära hotad (NT)' elif redlistCategory == 'LC': v = 'Livskraftig (LC)' elif redlistCategory == 'DD': v = 'Kunskapsbrist (DD)' elif redlistCategory == 'NE': v = 'Ej bedömd (NE)' elif redlistCategory == 'NA': v = 'Ej tillämplig (NA)' else: v = redlistCategory print(f'Rödlistning: {v}') v = [] if protectedByLaw is not None and protectedByLaw: v.append('nationellt fridlyst') v2 = properties.get('TaxonIsInvasiveInSweden') if v2 is not None and v2: v.append('främmande i Sverige') v2 = properties.get('TaxonIsInvasiveEuRegulation') if v2 is not None and v2: v.append('invasiv enligt EU-förordning 1143/2014') v2 = properties.get('Natura2000HabitatsDirective') if v2 is not None and v2: v2 = [] v3 = properties.get('Natura2000HabitatsDirectiveArticle2') if v3 is not None and v3: v3 = properties.get('Natura2000HabitatsDirectiveArticle2PrioritySpecie') if v3 is not None and v3: v2.append('II (prioriterad art)') else: v2.append('II') v3 = properties.get('Natura2000HabitatsDirectiveArticle4') if v3 is not None and v3: v2.append('IV') v3 = properties.get('Natura2000HabitatsDirectiveArticle5') if v3 is not None and v3: v2.append('V') if len(v2) == 0: v.append('habitatdirektivet') elif len(v2) == 1: v.append('habitatdirektivets bilaga ' + v2[0]) else: v.append('habitatdirektivets bilagor ' + ', '.join(v2[0:-1]) + ' & ' + v2[-1]) v2 = properties.get('TaxonIsSignalSpecie') if v2 is not None and v2: v.append('signalart enligt Skogsstyrelsen') v2 = properties.get('BirdDirective') if v2 is not None and v2: v.append('fågeldirektivet') if len(v) > 0: v[0] = v[0][0].upper() + v[0][1:] v = '; '.join(v) else: v = '—' print(f'Naturvård: {v}') sum_count += 1 feature = layer.GetNextFeature() while feature is not None: printObservation(feature) feature = layer.GetNextFeature() if sum_count == 0: print('No matching observations found!') exit(0) max_count = max(summary.values()) width = len(str(max_count)) print('\n\nSummary\n-------') for name, n in sorted(summary.items(), key=lambda x: (-x[1], x[0])): print('{0:{width}d}× {1}'.format(n, name, width=width)) print('-' * (width+1)) print(f'{sum_count} matching observations')