#!/usr/bin/python3 #---------------------------------------------------------------------- # Create a QGIS project with observations from Artdatabanken # Copyright © 2023 Guilhem Moulin # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . #---------------------------------------------------------------------- import argparse import sys import json import re import requests import yaml import tempfile from datetime import datetime from requests.exceptions import HTTPError from xdg import xdg_config_home from pathlib import Path from osgeo import gdal, ogr, osr gdal.UseExceptions() from qgis.core import ( QgsApplication, QgsCoordinateReferenceSystem, QgsLayerDefinition, QgsLayerTreeLayer, QgsLayerTreeModel, QgsMapLayer, QgsPointXY, QgsProject, QgsRectangle, QgsUnitTypes, QgsVectorLayer, QgsWkbTypes ) from qgis.gui import QgsLayerTreeView, QgsMapCanvas from PyQt5.QtGui import QColor from PyQt5.QtCore import Qt target_srs = osr.SpatialReference() target_srs.ImportFromEPSG(3006) target_srs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER) gpkg_drv = ogr.GetDriverByName('GPKG') geojson_drv = ogr.GetDriverByName('GeoJSON') programName = 'gis-observation-map' with Path(xdg_config_home()).joinpath(programName).joinpath('config.yml').open(mode='r') as fp: config = yaml.load(fp, Loader=yaml.FullLoader) if config.get('QGIS') is None: config['QGIS'] = {} def ePath(v): return Path(v).expanduser() class geometryAction(argparse.Action): def __init__(self, option_strings, dest, nargs=None, **kwargs): super().__init__(option_strings, dest, **kwargs) def __call__(self, parser, namespace, values, option_string=None): items = getattr(namespace, self.dest, None) items = items.copy() style = getattr(namespace, 'geometry_style', None) # get current style layername = getattr(namespace, 'geometry_layername', None) # get current layer name items.append({'path': values, 'style': style, 'layername': layername}) setattr(namespace, self.dest, items) parser = argparse.ArgumentParser( description='Create a QGIS project with observations from Artdatabanken.', prog = programName, usage='''%(prog)s --project-home=DIR --project-name=NAME {--geometry=FILE|--point=Y,X} ... %(prog)s --observation-file=NAME {--geometry=FILE|--point=Y,X} ...''' ) parser.add_argument('--project-home', type=ePath, metavar='DIRECTORY', help='Project home directory') parser.add_argument('--project-name', metavar='NAME', help='Project filename (and title) relative to --project-home') parser.add_argument('--topo-basedir', default=config['QGIS'].get('Topo-Basedir'), type=ePath, metavar='DIRECTORY', help='Base directory for "Topografi 10", "Topografi 50", etc.') parser_geom = parser.add_argument_group('Geographic area of interest') parser_geom.add_argument('--geometry-style', type=ePath, dest='geometry_style', metavar='STYLE_FILE', help='QGIS Layer Style File (*.qml) to apply to subsequent geometry files') parser_geom.add_argument('--geometry-layername', dest='geometry_layername', metavar='NAME', help='Layer names for subsequent geometry files') parser_geom.add_argument('--geometry', nargs='*', default=[], type=ePath, metavar='GEOMETRY_FILE', action=geometryAction, help='Geometry file of interest') parser_geom.add_argument('--point', nargs='*', metavar='Y,X', default=[], help=f'Coordinates of interest (in {target_srs.GetName()})') parser_geom.add_argument('--margin', metavar='N', type=int, help='Margin (in meters) around geometry envelopes') parser_obs = parser.add_argument_group('Observations') parser_obs.add_argument('--no-observations', action='store_true', help='Do not fetch observations within the area(s) of interest') parser_obs.add_argument('--observation-file', type=Path, metavar='NAME', help='Observation file name relative to --project-home') parser_obs.add_argument('--observation-format', metavar='FORMAT', help='Format for the observation file') parser_obs.add_argument('--observation-style', default=config['QGIS'].get('ObservationStyle'), metavar='STYLE_FILE', type=ePath, help='QGIS Layer Style File (*.qml) to apply to the observation layer (default: %(default)s)') parser_filter = parser.add_argument_group('Search filter') parser_filter.add_argument('--data-provider', nargs='*', metavar='IDENTIFIER', default=[], help='Data provider identifier(s), for instance "Artportalen,MVM"') parser_filter.add_argument('--since', metavar='YYYY-MM-DD', help='Start date for observations in ISO 8601 format') parser_filter.add_argument('--until', metavar='YYYY-MM-DD', help='End date for observations in ISO 8601 format') args = parser.parse_args() if args.no_observations and (args.observation_file is not None or args.observation_format is not None): parser.error('--no-observations is mutually exclusive with --observation-*') if args.project_home is not None and args.project_name is not None: QgsApplication.setPrefixPath('/usr/bin/qgis', True) qgs = QgsApplication([], False) qgs.initQgis() args.project_home.mkdir(mode=0o755, exist_ok=True) projectPath = args.project_home.joinpath(args.project_name) if projectPath.suffix not in ['.qgs', '.qgz']: projectPath = projectPath.with_suffix('.qgz') if projectPath.exists(): raise Exception(f'{projectPath}: file exists') projectInstance = QgsProject.instance() projectInstance.setFileName(projectPath.as_posix()) projectInstance.setDistanceUnits(QgsUnitTypes.DistanceMeters) projectInstance.setAreaUnits(QgsUnitTypes.AreaSquareMeters) projectInstance.setTitle(Path(args.project_name).stem) layerTreeRoot = projectInstance.layerTreeRoot() qgis_crs = QgsCoordinateReferenceSystem(target_srs.ExportToWkt()) if qgis_crs.isValid(): projectInstance.setCrs(qgis_crs) else: raise Exception('Invalid CRS') if args.observation_file is None: args.observation_file = Path('fynd') elif args.observation_file is not None: projectInstance = None elif args.project_home is None or args.project_name is None: parser.print_usage() exit(1) if len(args.geometry) == 0 and len(args.point) == 0: parser.print_usage() exit(1) if len(args.point) > 0 and (args.margin is None or args.margin <= 0): parser.error('--point requires positive --margin') for k in ['since', 'until']: v = getattr(args, k, None) if v is not None: d = datetime.fromisoformat(v) setattr(args, k, d.isoformat()) if projectInstance is not None: if len(args.geometry) > 1: sourceGroup = layerTreeRoot.addGroup('Source geometry') sourceGroup.setItemVisibilityChecked(False) for i, geom in enumerate(args.geometry): path = geom['path'].as_posix() layername = geom['layername'] if layername is None: ds = ogr.Open(path, update=0) layername = ds.GetLayer().GetName() ds = None else: path = f'{path}|layername={layername}' layer = QgsVectorLayer(path, layername.title(), 'ogr') if not layer.isValid(): raise Exception(f'ERROR: {path}: failed to load in QGIS') layer.setReadOnly(True) layer.setFlags(QgsMapLayer.Identifiable | QgsMapLayer.Searchable) if geom['style'] is not None: layer.loadNamedStyle(geom['style'].as_posix()) elif layer.geometryType() == QgsWkbTypes.PointGeometry: symbol = layer.renderer().symbol().symbolLayers()[0] symbol.setSize(2.0) symbol.setFillColor(QColor('#db1e2a')) symbol.setStrokeColor(QColor('#801119')) symbol.setStrokeStyle(Qt.SolidLine) symbol.setStrokeWidth(0) elif layer.geometryType() == QgsWkbTypes.LineGeometry: symbol = layer.renderer().symbol().symbolLayers()[0] symbol.setColor(QColor('black')) symbol.setPenStyle(Qt.DashLine) symbol.setWidth(0.39) elif layer.geometryType() == QgsWkbTypes.PolygonGeometry: symbol = layer.renderer().symbol().symbolLayers()[0] symbol.setBrushStyle(Qt.NoBrush) symbol.setStrokeColor(QColor('black')) symbol.setStrokeStyle(Qt.DashLine) symbol.setStrokeWidth(0.39) if len(args.geometry) == 1: projectInstance.addMapLayer(layer) layerTreeRoot.findLayer(layer.id()).setItemVisibilityChecked(False) else: projectInstance.addMapLayer(layer, False) sourceGroup.addLayer(layer) def gpkg_intersects(path, source_geometry): ds = gpkg_drv.Open(path, update=0) for i in range(ds.GetLayerCount()): layer = ds.GetLayerByIndex(i) source_srs = layer.GetSpatialRef() transform_srs = osr.CoordinateTransformation(source_srs, target_srs) feature = layer.GetNextFeature() while feature is not None: geometry = feature.GetGeometryRef() geometry.Transform(transform_srs) if geometry.Intersects(source_geometry): ds = None return True feature = layer.GetNextFeature() return False def idx_intersects(idx_path, source_geometry): ds = geojson_drv.Open(idx_path, update=0) if ds.GetLayerCount() != 1: print(f'WARN: {idx_path}: has {ds.GetLayerCount()} != 1 layers', file=sys.stderr) layer = ds.GetLayer() source_srs = layer.GetSpatialRef() transform_srs = osr.CoordinateTransformation(source_srs, target_srs) locFieldIdx = layer.FindFieldIndex('location', True) locations = [] union = ogr.Geometry(ogr.wkbMultiPolygon) feature = layer.GetNextFeature() while feature is not None: geometry = feature.GetGeometryRef() geometry.Transform(transform_srs) if geometry.Intersects(source_geometry): loc = feature.GetField(locFieldIdx) locations.append(loc) union.AddGeometry(geometry) feature = layer.GetNextFeature() contains = union.UnionCascaded().Contains(source_geometry) ds = None return locations, contains def find_qlr(basedir): if not basedir.is_dir(): return None qlr = sorted(basedir.glob('*.qlr')) if len(qlr) == 0 or not qlr[0].is_file(): print(f'WARN: {basedir}: does not contain any layer definition file, skipping', file=sys.stderr) return None return qlr[0] clipGeometry = ogr.Geometry(ogr.wkbMultiPolygon) for pt in args.point: # northing,easting in target SRS y, x = pt.split(',', 1) x = int(x) y = int(y) minX = x - args.margin maxX = x + args.margin minY = y - args.margin maxY = y + args.margin ring = ogr.Geometry(ogr.wkbLinearRing) ring.AddPoint_2D(minX, maxY) ring.AddPoint_2D(maxX, maxY) ring.AddPoint_2D(maxX, minY) ring.AddPoint_2D(minX, minY) ring.AddPoint_2D(minX, maxY) bbox = ogr.Geometry(ogr.wkbPolygon) bbox.AddGeometry(ring) clipGeometry.AddGeometry(bbox) for geom in args.geometry: ds = ogr.Open(geom['path'].as_posix(), update=0) if geom['layername'] is None: layer = ds.GetLayer() else: layer = ds.GetLayerByName(geom['layername']) source_srs = layer.GetSpatialRef() transform_srs = osr.CoordinateTransformation(source_srs, target_srs) feature = layer.GetNextFeature() while feature is not None: geometry = feature.GetGeometryRef() if not geometry.IsValid(): raise Exception('Invalid geometry') geometryType = geometry.GetGeometryType() geometry = geometry.Clone() geometry.FlattenTo2D() geometry.Transform(transform_srs) if args.margin is not None: minX, maxX, minY, maxY = geometry.GetEnvelope() minX -= args.margin maxX += args.margin minY -= args.margin maxY += args.margin ring = ogr.Geometry(ogr.wkbLinearRing) ring.AddPoint_2D(minX, maxY) ring.AddPoint_2D(maxX, maxY) ring.AddPoint_2D(maxX, minY) ring.AddPoint_2D(minX, minY) ring.AddPoint_2D(minX, maxY) bbox = ogr.Geometry(ogr.wkbPolygon) bbox.AddGeometry(ring) clipGeometry.AddGeometry(bbox) elif geometryType == ogr.wkbPoint or geometryType == ogr.wkbMultiPoint: parser.error('Point geometry requires positive --margin') elif geometryType == ogr.wkbLineString or geometryType == ogr.wkbMultiLineString: parser.error('Line geometry requires positive --margin') elif geometryType == ogr.wkbPolygon: clipGeometry.AddGeometry(geometry) elif geometryType == ogr.wkbMultiPolygon: for i in range(geometry.GetGeometryCount()): geom2 = geometry.GetGeometryRef(i) clipGeometry.AddGeometry(geom2) else: raise Exception('Unsuported geometry (try --margin=0)') feature = layer.GetNextFeature() clipGeometry = clipGeometry.UnionCascaded() if not clipGeometry.IsValid(): raise Exception('Invalid geometry') if projectInstance is not None: pt0X, pt1X, pt0Y, pt1Y = clipGeometry.GetEnvelope() canvas = QgsMapCanvas() extent = QgsRectangle(QgsPointXY(pt0X, pt0Y), QgsPointXY(pt1X, pt1Y)) canvas.setExtent(extent) def geometricFilterPolygon(polygon): polygon2 = [] if polygon.GetGeometryType() != ogr.wkbPolygon: raise Exception('Not a polygon') for i in range(polygon.GetGeometryCount()): ring = polygon.GetGeometryRef(i) n = ring.GetPointCount() if ring.GetGeometryType() != ogr.wkbLineString: raise Exception('Not a linestring') if n > 0 and ring.GetPoint(0) != ring.GetPoint(n-1): raise Exception('Not a linear ring') ring2 = [] for i in range(n): lon, lat = ring.GetPoint_2D(i) ring2.append([lon, lat]) polygon2.append(ring2) return { 'type': 'polygon', 'coordinates': polygon2 } # https://github.com/biodiversitydata-se/SOS/blob/master/Docs/SearchFilter.md#geographics-filter def geometricFilter(geometry): wgs84 = osr.SpatialReference() wgs84.ImportFromEPSG(4326) # WGS84 wgs84.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER) towgs84 = osr.CoordinateTransformation(target_srs, wgs84) geometry = geometry.Clone() geometry.Transform(towgs84) geometryType = geometry.GetGeometryType() myGeometries = [] if geometryType == ogr.wkbPoint or geometryType == ogr.wkbMultiPoint: parser.error('Point geometry requires positive --margin') elif geometryType == ogr.wkbLineString or geometryType == ogr.wkbMultiLineString: parser.error('Line geometry requires positive --margin') elif geometryType == ogr.wkbPolygon: gfp = geometricFilterPolygon(geometry) myGeometries.append(gfp) elif geometryType == ogr.wkbMultiPolygon: for i in range(geometry.GetGeometryCount()): geom = geometry.GetGeometryRef(i) gfp = geometricFilterPolygon(geom) myGeometries.append(gfp) else: raise Exception('Unsuported geometry (try --margin=0)') return myGeometries artDataBankenURL = 'https://api.artdatabanken.se' artDataBankenHeaders = { 'X-Api-Version': '1.5', 'X-Requesting-System': '', # https://github.com/biodiversitydata-se/SOS/blob/master/Docs/Authentication.md 'Ocp-Apim-Subscription-Key': config['ArtDataBanken']['Subscription-Key'] } def getObservations(taxonLists, taxonRedlistCategories, searchFilter): # https://api-portal.artdatabanken.se/docs/services/sos-api-v1/operations/Exports_DownloadGeoJson headers = artDataBankenHeaders.copy() headers['Content-Type'] = 'application/json' headers['Authorization'] = config['ArtDataBanken']['Authorization'] params = { 'cultureCode': 'sv-SE', 'flat': 'true', 'excludeNullValues': 'true', 'gzip': 'false', #'sensitiveObservations': 'true', 'validateSearchFilter': 'true', } resp = requests.post( artDataBankenURL + '/species-observation-system/v1/Exports/Download/GeoJson', headers=headers, params=params, data=json.dumps(searchFilter) ) resp.raise_for_status() if resp.text == '': obs = { 'crs': 'EPSG:4326', 'type': 'FeatureCollection', 'features': [] } else: obs = resp.json() # https://www.rfc-editor.org/rfc/rfc7946 if obs is None or type(obs) != dict or 'type' not in obs.keys(): raise Exception('invalid GeoJSON output') if 'crs' not in obs.keys(): print('WARN: GeoJSON output lacks CRS', file=sys.stderr) if obs['type'] == 'FeatureCollection' and type(obs.get('features')) == list: print(f'{len(obs["features"])} observations found', file=sys.stderr) for feat in obs['features']: properties = feat['properties'] if type(feat) != dict or feat.get('type') != 'Feature' or type(properties) != dict: print('WARN: Invalid feature in GeoJSON output', file=sys.stderr) continue tid = properties.get('DyntaxaTaxonId') if tid is None: print('WARN: Feature lacks taxon ID', file=sys.stderr) continue for k, taxonList in taxonLists.items(): v = (tid in taxonList) if k in properties.keys() and properties[k] != v: print(f'WARN: #{tid} {k}: {properties[k]} → {v}', file=sys.stderr) properties[k] = v if tid in taxonRedlistCategories.keys(): k = 'RedlistCategory' v = taxonRedlistCategories[tid] if k in properties.keys() and properties[k] != v: print(f'WARN: #{tid} {k}: {properties[k]} → {v}', file=sys.stderr) properties[k] = v if args.observation_file.is_absolute(): path = args.observation_file elif args.project_home is None: path = args.observation_file.expanduser() else: path = args.project_home.joinpath(args.observation_file) drv = None suffix = path.suffix if suffix is not None and suffix != '' and suffix != '.': # try to infer format from extension for i in range(ogr.GetDriverCount()): drv2 = ogr.GetDriver(i) if drv2 is None: continue v = drv2.GetMetadataItem(gdal.DMD_EXTENSIONS) if v is None: continue if suffix.removeprefix('.') in v.split(' '): drv = drv2 break if drv is None: if args.observation_format is None: observation_format = 'GPKG' else: observation_format = args.observation_format drv = ogr.GetDriverByName(observation_format) if drv is None: raise Exception(f'Invalid format {observation_format}') suffixes = drv.GetMetadataItem(gdal.DMD_EXTENSIONS).split(' ') if len(suffixes) < 1 or suffixes[0] == '': raise Exception(f'Invalid format {observation_format}') if suffix is not None and suffix != '' and suffix != '.': print(f'WARN: unknown extension "{suffix}", appending ".{suffixes[0]}"', file=sys.stderr) suffix += '.' + suffixes[0] path = path.with_suffix(suffix) if args.observation_format is not None and drv.name.lower() != args.observation_format.lower(): print(f'WARN: overwriting observation file format to {drv.name}', file=sys.stderr) layername = path.stem.lower() obs['name'] = layername if drv.name == geojson_drv.name: # for GeoJSON, don't reproject and pretty-print the output; this # is useful for manual inspection and/or debugging with path.open(mode='w') as fp: json.dump(obs, fp, indent=2, ensure_ascii=False) else: with tempfile.NamedTemporaryFile(suffix='.geojson', mode='w') as fp: json.dump(obs, fp) fp.flush() gdal.VectorTranslate( path.as_posix(), fp.name, format=drv.name, layerName=layername, geometryType='POINT', accessMode='overwrite', reproject=True, dstSRS=target_srs) obs = None if projectInstance is not None: query = '''"IsPositiveObservation" AND "IsNaturalOccurrence" AND "CoordinateUncertaintyInMeters" <= 250''' layer = QgsVectorLayer(f'{path.as_posix()}|layername={layername}|subset={query}', layername.title(), 'ogr') if not layer.isValid(): raise Exception(f'ERROR: {path}: failed to load in QGIS') if args.observation_style is not None: layer.loadNamedStyle(args.observation_style.as_posix()) layer.setReadOnly(True) layer.setFlags(QgsMapLayer.Identifiable | QgsMapLayer.Searchable) projectInstance.addMapLayer(layer) layerTreeRoot.findLayer(layer.id()).setItemVisibilityChecked(True) def mapDataProviders(names): # https://github.com/biodiversitydata-se/SOS/blob/master/Docs/DataProviders.md params = { 'cultureCode': 'sv-SE', 'includeProvidersWithNoObservations': 'false', } resp = requests.get( artDataBankenURL + '/species-observation-system/v1/DataProviders', headers=artDataBankenHeaders ) resp.raise_for_status() dataProviders = resp.json() #print(json.dumps(dataProviders, indent=2, ensure_ascii=False)) ids = [] for ns in names: for name in ns.split(','): i = None for p in dataProviders: if p['identifier'].lower() == name.lower(): if i is not None: raise Exception(f'multiple results found for provider "{name}"') i = p['id'] if i is None: raise Exception(f'Unknown data provider "{name}"\nValid providers are {", ".join([p["identifier"] for p in dataProviders])}') ids.append(i) return ids def getTaxonLists(): resp = requests.get( artDataBankenURL + '/species-observation-system/v1/TaxonLists?cultureCode=en-US', headers=artDataBankenHeaders ) resp.raise_for_status() taxonLists = resp.json() #print(json.dumps(taxonLists, indent=2, ensure_ascii=False)) taxonLists2 = {} # https://github.com/biodiversitydata-se/SOS/blob/master/Src/SOS.lib/Resources/TaxonLists.json # https://artfakta.se/artinformation/taxa i1 = getTaxonList(taxonLists, taxonLists2, 'ProtectedByLaw', 'Protected by law species') i2 = getTaxonList(taxonLists, taxonLists2, 'TaxonIsSignalSpecie', 'Signal species') #getTaxonList(taxonLists, taxonLists2, 'TaxonIsInvasive', 'Invasive species') #getTaxonList(taxonLists, taxonLists2, 'TaxonIsInvasiveInSweden', 'Invasive species in Sweden') #getTaxonList(taxonLists, taxonLists2, 'TaxonIsInvasiveEuRegulation', 'EU regulation 1143/2014') i8 = getTaxonList(taxonLists, taxonLists2, 'Natura2000HabitatsDirective', 'Habitats directive species') getTaxonList(taxonLists, taxonLists2, 'Natura2000HabitatsDirectiveArticle2', 'Habitats directive Annex 2') getTaxonList(taxonLists, taxonLists2, 'Natura2000HabitatsDirectiveArticle2PrioritySpecie', 'Habitats directive Annex 2, priority species') getTaxonList(taxonLists, taxonLists2, 'Natura2000HabitatsDirectiveArticle4', 'Habitats directive Annex 4') getTaxonList(taxonLists, taxonLists2, 'Natura2000HabitatsDirectiveArticle5', 'Habitats directive Annex 5') # sanity check directives = [ 'Natura2000HabitatsDirectiveArticle2', 'Natura2000HabitatsDirectiveArticle2PrioritySpecie', 'Natura2000HabitatsDirectiveArticle4', 'Natura2000HabitatsDirectiveArticle5' ] d0 = 'Natura2000HabitatsDirective' for i in taxonLists2[d0]: if not any(map(lambda d: i in taxonLists2[d], directives)): raise Exception(f'missing taxon #{i} (in {d0}) from directives {",".join(directives)}') for d in directives: for i in taxonLists2[d]: if i not in taxonLists2[d0]: raise Exception(f'missing taxon #{i} (in {d}) from {d0}') for i in taxonLists2['Natura2000HabitatsDirectiveArticle2PrioritySpecie']: if i not in taxonLists2['Natura2000HabitatsDirectiveArticle2']: raise Exception(f'missing taxon #{i} (in Natura2000HabitatsDirectiveArticle2PrioritySpecie) from Natura2000HabitatsDirectiveArticle2') #i13 = getTaxonList(taxonLists, taxonLists2, 'BirdDirective', 'Birds Directive') #getTaxonList(taxonLists, taxonLists2, 'BirdDirectiveArticle1', 'Birds directive - Annex 1') #getTaxonList(taxonLists, taxonLists2, 'BirdDirectiveArticle2', 'Birds directive - Annex 2') #getTaxonList(taxonLists, taxonLists2, 'TaxonIsPriorityBird', 'Priority birds') # #directives = ['BirdDirectiveArticle1', 'BirdDirectiveArticle2'] #d0 = 'BirdDirective' ##for i in taxonLists2[d0]: ## if not any(map(lambda d: i in taxonLists2[d], directives)): ## raise Exception(f'missing taxon #{i} (in {d0}) from directives {",".join(directives)}') #for d in directives: # for i in taxonLists2[d]: # if i not in taxonLists2[d0]: # raise Exception(f'missing taxon #{i} (in {d}) from {d0}') i7 = getTaxonList(taxonLists, taxonLists2, 'TaxonIsRedlisted', 'Redlisted species') taxonRedlistCategories = getTaxonRedlistCategories(taxonLists, i7) i18 = getTaxonList(taxonLists, None, None, 'Swedish forest agency nature conservation species') taxonListIds = [i1, i2, i7, i8, i18] return taxonLists2, taxonRedlistCategories, taxonListIds def getTaxonList(taxonLists, taxonLists2, key, name): i = None for t in taxonLists: if t['name'] == name: if i is not None: raise Exception(f'multiple results found for taxon list "{name}"') i = t['id'] if i is None: raise Exception(f'no result found for taxon list "{name}"') if taxonLists2 is not None and key is not None: resp = requests.get( artDataBankenURL + f'/species-observation-system/v1/TaxonLists/{i}/Taxa', headers=artDataBankenHeaders ) resp.raise_for_status() resp = resp.json() if type(resp) != list: raise Exception(f'expected list, got {type(resp)}') taxonLists2[key] = {r['id'] for r in resp} return i def getTaxonRedlistCategories(taxonLists, i): taxonRedlistCategories = {} r = re.compile(r'\(([A-Z][A-Z])\)\Z') for t in taxonLists: if t.get('parentId') != i: continue name = t['name'] c = r.search(name) if not c: raise Exception(f'invalid redlist name "{name}"') c = c.group(1) resp = requests.get( artDataBankenURL + f'/species-observation-system/v1/TaxonLists/{t["id"]}/Taxa', headers=artDataBankenHeaders ) resp.raise_for_status() resp = resp.json() if type(resp) != list: raise Exception(f'expected list, got {type(resp)}') for t in resp: j = t['id'] if j in taxonRedlistCategories.keys(): raise Exception(f'duplicate redlist classification for taxon #{j}') taxonRedlistCategories[j] = c return taxonRedlistCategories if not args.no_observations: geographicsFilter = { 'geometries': geometricFilter(clipGeometry), 'maxAccuracy': 2500, 'considerObservationAccuracy': True, 'considerDisturbanceRadius': False } taxonLists, taxonRedlistCategories, taxonListIds = getTaxonLists() # https://github.com/biodiversitydata-se/SOS/blob/master/Docs/SearchFilter.md searchFilter = { 'geographics': geographicsFilter, 'determinationFilter': 'NoFilter', 'notRecoveredFilter': 'DontIncludeNotRecovered', 'occurrenceStatus': 'Present', 'verificationStatus': 'BothVerifiedAndNotVerified', 'output': { 'fields': [ # https://github.com/biodiversitydata-se/SOS/blob/master/Docs/SearchFilter.md#fields # https://github.com/biodiversitydata-se/SOS/blob/master/Docs/Observation.md 'dataProviderId', 'datasetName', #'basisOfRecord', 'rightsHolder', 'modified', 'sensitive', 'measurementOrFacts', 'projects', 'occurrence.occurrenceId', 'occurrence.occurrenceRemarks', 'occurrence.recordedBy', 'occurrence.reportedBy', 'occurrence.reportedDate', 'occurrence.occurrenceStatus', 'occurrence.activity', 'occurrence.behavior', 'occurrence.biotope', 'occurrence.biotopeDescription', 'occurrence.lifeStage', 'occurrence.reproductiveCondition', 'occurrence.sex', 'occurrence.associatedMedia', 'occurrence.associatedReferences', 'occurrence.individualCount', 'occurrence.organismQuantityInt', 'occurrence.organismQuantityUnit', 'occurrence.sensitivityCategory', 'occurrence.isNaturalOccurrence', 'occurrence.isNeverFoundObservation', 'occurrence.isNotRediscoveredObservation', 'occurrence.isPositiveObservation', 'occurrence.substrate.description', 'occurrence.substrate.name', 'occurrence.length', 'occurrence.weight', 'occurrence.url', 'event.startDate', 'event.endDate', 'event.habitat', 'event.eventRemarks', 'event.discoveryMethod', 'event.measurementOrFacts', 'event.fieldNotes', 'identification.verified', 'identification.uncertainIdentification', 'identification.verificationStatus', 'identification.confirmedBy', 'identification.confirmedDate', 'identification.identifiedBy', 'identification.dateIdentified', 'identification.verifiedBy', 'identification.determinationMethod', 'identification.identificationRemarks', 'location.locality', 'location.county', 'location.municipality', 'location.coordinateUncertaintyInMeters', 'taxon.id', 'taxon.scientificName', 'taxon.scientificNameAuthorship', 'taxon.vernacularName', 'taxon.genus', 'taxon.family', 'taxon.order', 'taxon.class', 'taxon.phylum', 'taxon.kingdom', 'taxon.attributes.taxonCategory', 'taxon.attributes.organismGroup', 'taxon.attributes.redlistCategory', ] } } # https://artfakta.se/artinformation/taxa/biota-0/detaljer kingdomIds = { 'Animalia': 5000001, 'Archaea': 5000082, 'Bacteria': 5000052, 'Chromista': 5000055, 'Fungi': 5000039, 'Plantae': 5000045, 'Protozoa': 5000060, 'Viruses': 5000083, 'Algae': 6001047, } searchFilter['taxon'] = { 'ids': list(map(lambda k: kingdomIds[k], ['Fungi', 'Plantae'])), 'includeUnderlyingTaxa': True, # https://github.com/biodiversitydata-se/SOS/blob/master/Docs/SearchFilter.md#taxon-lists 'taxonListIds': taxonListIds, 'taxonListOperator': 'Filter' } if args.data_provider is not None: searchFilter['dataProvider'] = { 'ids': mapDataProviders(args.data_provider) } if args.since is not None or args.until is not None: searchFilter['date'] = { # https://github.com/biodiversitydata-se/SOS/blob/master/Docs/SearchFilter.md#date-filter 'startDate': args.since, 'endDate': args.until, 'dateFilterType': 'OverlappingStartDateAndEndDate' } getObservations(taxonLists, taxonRedlistCategories, searchFilter) if args.project_home is not None and projectInstance is not None: for item in config['QGIS'].get('NatureValue', []): name = item['Name'].title() srcPath = Path(item['Source']).expanduser() dstPath = args.project_home.joinpath(srcPath.name) fmt = item.get('Format') if fmt is not None: drv = ogr.GetDriverByName(fmt) if drv is None: raise Exception(f'Invalid format {fmt}') suffixes = drv.GetMetadataItem(gdal.DMD_EXTENSIONS).split(' ') if dstPath.suffix[1:] not in suffixes: dstPath = dstPath.with_suffix('.' + suffixes[0]) layerName = item.get('LayerName') srcDs = gdal.OpenEx( srcPath.as_posix(), gdal.OF_VECTOR, open_options=item.get('SourceOpenOptions', []) ) dstPath = dstPath.as_posix() gdal.VectorTranslate( dstPath, srcDs, format=fmt, accessMode='overwrite', reproject=True, dstSRS=target_srs, clipDst=clipGeometry.ExportToWkt() if item.get('Clip', True) else None, datasetCreationOptions=item.get('DatasetCreationOptions', []), layerCreationOptions=item.get('LayerCreationOptions', []), geometryType=item.get('GeometryType'), skipFailures=True, layers=[layerName] if layerName is not None else None ) srcDs = None if layerName is not None: dstPath = f'{dstPath}|layername={layerName}' layer = QgsVectorLayer(dstPath, name, 'ogr') if not layer.isValid(): raise Exception(f'ERROR: {dstPath}: failed to load in QGIS') projectInstance.addMapLayer(layer, False) layerTreeRoot.insertChildNode(-1, QgsLayerTreeLayer(layer)) # last layer.setReadOnly(True) isVisible = item.get('Visible', True) layerTreeRoot.findLayer(layer.id()).setItemVisibilityChecked(isVisible) style = item.get('Style') if style is not None: style = Path(style).expanduser() layer.loadNamedStyle(style.as_posix()) topo_maps = ['Topografi 10', 'Topografi 50', 'Topografi 100'] if args.topo_basedir is not None and projectInstance is not None: idxContains = {} for topo in topo_maps: qlr_paths = [] qlr_merged = None d = args.topo_basedir.joinpath(topo) if not d.is_dir(): continue idx = d.joinpath('index.geojson') if idx.is_file(): tiles, idxContains[topo] = idx_intersects(idx.as_posix(), clipGeometry) if len(tiles) == 0: continue if args.project_home is None: for tile in tiles: t = d.joinpath(tile) qlr_path = find_qlr(t) if qlr_path is None: continue qlr_paths.append(qlr_path) else: t0 = d.joinpath(tiles[0]) qlr_path = find_qlr(d) if qlr_path is None: continue d2 = args.project_home.joinpath(topo) d2.mkdir(mode=0o755, exist_ok=True) d2.joinpath(qlr_path.name).symlink_to(qlr_path, target_is_directory=False) qlr_merged = d2.joinpath(qlr_path.name) gpkgs = set() for tile in tiles: for gpkg in d.joinpath(tile).glob('*.gpkg'): gpkgs.add(gpkg.name) for gpkg in gpkgs: dst = d2.joinpath(gpkg) if dst.exists(): raise Exception(f'{dst}: file exists') for tile in tiles: src = d.joinpath(tile).joinpath(gpkg) if src.is_file(): gdal.VectorTranslate(dst.as_posix(), src.as_posix(), format='GPKG', accessMode='upsert') else: if args.project_home is not None: l = args.project_home.joinpath(topo) l.symlink_to(d, target_is_directory=True) d = l for t in sorted(d.glob('*')): qlr_path = find_qlr(t) if qlr_path is None: continue gpkg_name = 'mark.gpkg' gpkg_path = t.joinpath(gpkg_name) if not gpkg_path.is_file(): print(f'WARN: {t}: lacks {gpkg_name}, skipping', file=sys.stderr) continue if not gpkg_intersects(gpkg_path.as_posix(), clipGeometry): continue qlr_paths.append(qlr_path) if projectInstance is not None: if qlr_merged is not None: QgsLayerDefinition.loadLayerDefinition(qlr_merged.as_posix(), projectInstance, layerTreeRoot) lt = layerTreeRoot.children()[-1] lt.setName(topo) lyrSubTree = [lt] elif len(qlr_paths) > 0: rootGroup = layerTreeRoot.addGroup(topo) for qlr_path in qlr_paths: QgsLayerDefinition.loadLayerDefinition(qlr_path.as_posix(), projectInstance, rootGroup) lyrSubTree = rootGroup.children() rootGroup.setExpanded(False) if len(lyrSubTree) == len(qlr_paths): for i, lt in enumerate(lyrSubTree): name = qlr_paths[i].parent.stem lt.setName(name) for lt in lyrSubTree: lt.setExpanded(False) lt.setItemVisibilityChecked(True) for lyr in lt.findLayers(): lyr.setItemVisibilityChecked(True) lyr = lyr.layer() lyr.setReadOnly(True) lyr.setFlags(QgsMapLayer.Removable) if projectInstance is not None: maxScale = 1 for topo in topo_maps: minScale = None rootGroup = QgsProject.instance().layerTreeRoot().findGroup(topo) if rootGroup is None: continue if idxContains.get(topo) == False: # if the tileset doesn't entirely covers the area of interest, # then uncheck the layer group and disable scale-based visibility rootGroup.setItemVisibilityChecked(False) elif topo == 'Topografi 10': minScale = 50000 elif topo == 'Topografi 50': minScale = 100000 elif topo == 'Topografi 100': minScale = 10000000 if minScale is not None: for lt in rootGroup.findLayers(): lyr = lt.layer() lyr.setScaleBasedVisibility(True) lyr.setMinimumScale(minScale) lyr.setMaximumScale(maxScale) maxScale = minScale if projectInstance is not None: projectInstance.write()