aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common.py16
-rw-r--r--config.yml2
-rw-r--r--import_source.py69
-rw-r--r--schema.sql84
-rwxr-xr-xwebmap-download7
-rwxr-xr-xwebmap-import361
6 files changed, 495 insertions, 44 deletions
diff --git a/common.py b/common.py
index eab9dd5..b1d14ba 100644
--- a/common.py
+++ b/common.py
@@ -27,6 +27,7 @@ from stat import S_ISDIR
import math
import logging
from typing import Any, Optional, Never
+from hashlib import sha256
from xdg.BaseDirectory import xdg_config_home
import yaml
@@ -143,6 +144,11 @@ def parse_config(path : Optional[Path] = None,
return config
+# pylint: disable-next=invalid-name
+def getSourcePathLockFileName(path : str) -> str:
+ """Return the name of the lockfile associated with a source path."""
+ return sha256(path.encode('utf-8')).hexdigest() + '.lck'
+
def format_bytes(n : int, threshold : int = 768, precision : int = 2) -> str:
"""Format a number of bytes to a SI unit"""
@@ -174,6 +180,16 @@ def escape_identifier(identifier : str) -> str:
# SQL:1999 delimited identifier
return '"' + identifier.replace('"', '""') + '"'
+def escape_literal_str(literal : str) -> str:
+ """Escape the given character string literal, cf.
+ swig/python/gdal-utils/osgeo_utils/samples/validate_gpkg.py:_esc_literal()."""
+
+ if literal is None or '\x00' in literal:
+ raise RuntimeError(f'Invalid literal "{literal}"')
+
+ # SQL:1999 character string literal
+ return '\'' + literal.replace('\'', '\'\'') + '\''
+
######
# The function definitions below are taken from cpython's source code
diff --git a/config.yml b/config.yml
index b13f363..2d6a526 100644
--- a/config.yml
+++ b/config.yml
@@ -108,6 +108,8 @@ dataset:
open-options-publish:
USER: webmap_guest
+ sourcecache-layername: public.sourcecache
+
# Optional dictionary of default layer creation options, cf.
# https://gdal.org/drivers/vector/pg.html#layer-creation-options or
# https://gdal.org/drivers/vector/gpkg.html#layer-creation-options
diff --git a/import_source.py b/import_source.py
index e30a245..1fa754c 100644
--- a/import_source.py
+++ b/import_source.py
@@ -37,7 +37,7 @@ from osgeo.gdalconst import (
)
from osgeo import gdalconst
-from common import BadConfiguration, escape_identifier
+from common import BadConfiguration, escape_identifier, escape_literal_str
from common_gdal import gdalSetOpenExArgs, gdalGetMetadataItem, formatTZFlag
def openOutputDS(def_dict : dict[str, Any]) -> gdal.Dataset:
@@ -431,18 +431,81 @@ def listArchiveMembers(namelist : list[str],
logging.debug('Ignoring archive member %s', name)
return members
+def getSourceCacheKey(source : dict[str,Any]) -> tuple[str,str,str]:
+ """Get the unique triplet (source_path, archive_member, layername)
+ for the layer source which can be used for attribute filters and
+ lookups."""
+ importdef = source['import']
+ sourcedef = source['source']
+ source_path = sourcedef['path']
+ archive_member = importdef['path'] if sourcedef.get('unar', None) is not None else ''
+ layername = importdef.get('layername', '')
+ return (source_path, archive_member, layername)
+
# pylint: disable-next=dangerous-default-value
def importSources(lyr_dst : ogr.Layer,
+ dso : gdal.Dataset,
sources : dict[str,Any] = {},
cachedir : Path|None = None,
- extent : ogr.Geometry|None = None) -> None:
- """Import source layers"""
+ mtimes : dict[str, int|None]|None = None,
+ lyr_sourcecache : ogr.Layer|None = None,
+ extent : ogr.Geometry|None = None) -> bool:
+ """Import source layers."""
+ bChanged = False
for source in sources:
_importSource(lyr_dst, **source['source'],
args=source['import'],
cachedir=cachedir,
extent=extent)
+ # force the PG driver to call EndCopy() to detect errors and trigger a rollback if needed
+ # TODO [3.9] use lyr_dst.GetDataset().FlushCache() instead
+ dso.FlushCache()
+
+ if lyr_sourcecache is None:
+ bChanged = True
+ continue
+
+ source_path, archive_member, layername = getSourceCacheKey(source)
+ attributeFilter = ('(source_path,archive_member,layername) = ('
+ + escape_literal_str(source_path) + ','
+ + escape_literal_str(archive_member) + ','
+ + escape_literal_str(layername) + ')')
+ lyr_sourcecache.SetAttributeFilter(attributeFilter)
+
+ feature = lyr_sourcecache.GetNextFeature()
+ if feature is not None:
+ logging.debug('Updating existing feature in source cache for %s', attributeFilter)
+ update = True
+ assert lyr_sourcecache.GetNextFeature() is None
+ else:
+ logging.debug('Creating new feature in source cache for %s', attributeFilter)
+ update = False
+ feature = ogr.Feature(lyr_sourcecache.GetLayerDefn())
+ feature.SetFieldString(0, source_path)
+ feature.SetFieldString(1, archive_member)
+ feature.SetFieldString(2, layername)
+
+ mtime_ns = mtimes[source_path]
+ if mtime_ns is None:
+ feature.SetFieldNull(3)
+ else:
+ feature.SetFieldInteger64(3, mtime_ns)
+
+ if update:
+ # TODO with gdal 3.7 and OLCUpdateFeature capability, use UpdateFeature() instead
+ if lyr_sourcecache.SetFeature(feature) != ogr.OGRERR_NONE:
+ raise RuntimeError('Could not update feature in sourcecache')
+ else:
+ if lyr_sourcecache.CreateFeature(feature) != ogr.OGRERR_NONE:
+ raise RuntimeError('Could not create new feature in sourcecache')
+
+ # TODO [3.9] use lyr_sourcecache.GetDataset().FlushCache() instead
+ dso.FlushCache()
+ bChanged = True # TODO fingerprint the features to detect changes
+
+ return bChanged
+
# pylint: disable-next=dangerous-default-value
def _importSource(lyr : ogr.Layer,
path : str = '/nonexistent',
diff --git a/schema.sql b/schema.sql
index 307acac..f56f711 100644
--- a/schema.sql
+++ b/schema.sql
@@ -2,8 +2,8 @@
-- PostgreSQL database dump
--
--- Dumped from database version 15.8 (Debian 15.8-0+deb12u1)
--- Dumped by pg_dump version 15.8 (Debian 15.8-0+deb12u1)
+-- Dumped from database version 15.12 (Debian 15.12-0+deb12u2)
+-- Dumped by pg_dump version 15.12 (Debian 15.12-0+deb12u2)
SET statement_timeout = 0;
SET lock_timeout = 0;
@@ -2756,6 +2756,49 @@ ALTER SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" OWNED BY postgis."vbk:vin
--
+-- Name: sourcecache; Type: TABLE; Schema: public; Owner: webmap_import
+--
+
+CREATE TABLE public.sourcecache (
+ ogc_fid bigint NOT NULL,
+ source_path character varying(2047) NOT NULL,
+ archive_member character varying(2047) NOT NULL,
+ layername character varying(255) NOT NULL,
+ mtime_ns bigint
+);
+
+
+ALTER TABLE public.sourcecache OWNER TO webmap_import;
+
+--
+-- Name: TABLE sourcecache; Type: COMMENT; Schema: public; Owner: webmap_import
+--
+
+COMMENT ON TABLE public.sourcecache IS 'Metadata cache for sources files';
+
+
+--
+-- Name: sourcecache_ogc_fid_seq; Type: SEQUENCE; Schema: public; Owner: webmap_import
+--
+
+CREATE SEQUENCE public.sourcecache_ogc_fid_seq
+ START WITH 1
+ INCREMENT BY 1
+ NO MINVALUE
+ NO MAXVALUE
+ CACHE 1;
+
+
+ALTER TABLE public.sourcecache_ogc_fid_seq OWNER TO webmap_import;
+
+--
+-- Name: sourcecache_ogc_fid_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: webmap_import
+--
+
+ALTER SEQUENCE public.sourcecache_ogc_fid_seq OWNED BY public.sourcecache.ogc_fid;
+
+
+--
-- Name: dammar ogc_fid; Type: DEFAULT; Schema: postgis; Owner: webmap_import
--
@@ -3127,6 +3170,13 @@ ALTER TABLE ONLY postgis."vbk:vindkraftverk" ALTER COLUMN ogc_fid SET DEFAULT ne
--
+-- Name: sourcecache ogc_fid; Type: DEFAULT; Schema: public; Owner: webmap_import
+--
+
+ALTER TABLE ONLY public.sourcecache ALTER COLUMN ogc_fid SET DEFAULT nextval('public.sourcecache_ogc_fid_seq'::regclass);
+
+
+--
-- Name: dammar dammar_DammID_key; Type: CONSTRAINT; Schema: postgis; Owner: webmap_import
--
@@ -4007,6 +4057,22 @@ ALTER TABLE ONLY postgis."vbk:vindkraftverk"
--
+-- Name: sourcecache sourcecache_pkey; Type: CONSTRAINT; Schema: public; Owner: webmap_import
+--
+
+ALTER TABLE ONLY public.sourcecache
+ ADD CONSTRAINT sourcecache_pkey PRIMARY KEY (ogc_fid);
+
+
+--
+-- Name: sourcecache sourcecache_source_path_archive_member_layername_key; Type: CONSTRAINT; Schema: public; Owner: webmap_import
+--
+
+ALTER TABLE ONLY public.sourcecache
+ ADD CONSTRAINT sourcecache_source_path_archive_member_layername_key UNIQUE (source_path, archive_member, layername);
+
+
+--
-- Name: dammar_wkb_geometry_geom_idx; Type: INDEX; Schema: postgis; Owner: webmap_import
--
@@ -5157,6 +5223,20 @@ GRANT SELECT,USAGE ON SEQUENCE postgis."vbk:vindkraftverk_ogc_fid_seq" TO webmap
--
+-- Name: TABLE sourcecache; Type: ACL; Schema: public; Owner: webmap_import
+--
+
+GRANT SELECT ON TABLE public.sourcecache TO webmap_guest;
+
+
+--
+-- Name: SEQUENCE sourcecache_ogc_fid_seq; Type: ACL; Schema: public; Owner: webmap_import
+--
+
+GRANT SELECT,USAGE ON SEQUENCE public.sourcecache_ogc_fid_seq TO webmap_guest;
+
+
+--
-- PostgreSQL database dump complete
--
diff --git a/webmap-download b/webmap-download
index edb624e..05aa2c4 100755
--- a/webmap-download
+++ b/webmap-download
@@ -44,12 +44,11 @@ import argparse
import itertools
from pathlib import Path
from email.utils import parsedate_to_datetime, formatdate
-from hashlib import sha256
from typing import Optional, NoReturn, Never
import requests
import common
-from common import BadConfiguration
+from common import BadConfiguration, getSourcePathLockFileName
def download_trystream(url : str, **kwargs) -> requests.Response:
"""GET a url, trying a number of times. Return immediately after the
@@ -309,9 +308,9 @@ def main() -> NoReturn:
# place an exclusive lock on a lockfile as the destination can be used by other layers
# hence might be updated in parallel
if lockdir_fd is not None:
- lockfile = sha256(dest.encode('utf-8')).hexdigest() + '.lck'
+ lockfile = getSourcePathLockFileName(dest)
# use O_TRUNC to bump lockfile's mtime
- lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o644,
+ lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode=0o664,
dir_fd=lockdir_fd)
try:
if lockdir_fd is not None:
diff --git a/webmap-import b/webmap-import
index 1171851..5cb76d3 100755
--- a/webmap-import
+++ b/webmap-import
@@ -20,13 +20,15 @@
# pylint: disable=invalid-name, missing-module-docstring, fixme
-from os import O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC
+from os import O_RDONLY, O_WRONLY, O_CREAT, O_TRUNC, O_CLOEXEC, O_PATH, O_DIRECTORY
import os
+from stat import S_ISREG
import sys
-from fcntl import flock, LOCK_EX
+from fcntl import flock, LOCK_EX, LOCK_SH
import logging
import argparse
import re
+from datetime import datetime
from pathlib import Path
from typing import Any, Optional, NoReturn
import traceback
@@ -41,7 +43,12 @@ from osgeo.gdalconst import (
from osgeo import gdalconst
import common
-from common import BadConfiguration, escape_identifier
+from common import (
+ BadConfiguration,
+ escape_identifier,
+ escape_literal_str,
+ getSourcePathLockFileName
+)
from common_gdal import (
gdalVersionMin,
gdalGetMetadataItem,
@@ -57,7 +64,8 @@ from import_source import (
createOutputLayer,
validateOutputLayer,
clearLayer,
- importSources
+ importSources,
+ getSourceCacheKey
)
def setFieldIf(cond : bool,
@@ -276,6 +284,258 @@ def validate_sources(layers : dict[str, Any]) -> None:
for layername in toremove:
layers.pop(layername)
+def validateSourceCacheField(defn : ogr.FeatureDefn, idx : int,
+ name : str,
+ typ : int,
+ subtyp : int = ogr.OFSTNone,
+ width : int = 0,
+ isNullable : Optional[bool] = None) -> bool:
+ """Validate field #idx from the source cache layer/table."""
+ n = defn.GetFieldCount()
+ if idx >= n:
+ return False
+ defn = defn.GetFieldDefn(idx)
+
+ b = True
+ name2 = defn.GetName()
+ if name2 != name:
+ logging.warning('Source cache layer\'s field #%d has name "%s" != "%s", ignoring cache',
+ idx, name2, name)
+ b = False
+
+ if isNullable is not None and defn.IsNullable() != isNullable:
+ # non-fatal
+ logging.warning('Source cache layer\'s field #%d ("%s") %s nullable',
+ idx, name2, 'is' if defn.IsNullable() else 'is not')
+
+ typ2 = defn.GetType()
+ if typ2 != typ:
+ logging.warning('Source cache layer\'s field #%d ("%s") has type %s != %s, ignoring cache',
+ idx, name2,
+ ogr.GetFieldTypeName(typ2), ogr.GetFieldTypeName(typ))
+ b = False
+
+ subtyp2 = defn.GetSubType()
+ if subtyp2 != subtyp:
+ logging.warning('Source cache layer\'s field #%d ("%s") has subtype %s != %s, '
+ 'ignoring cache', idx, name2,
+ ogr.GetFieldSubTypeName(subtyp2), ogr.GetFieldSubTypeName(subtyp))
+ b = False
+
+ width2 = defn.GetWidth()
+ if width2 != 0 and (width == 0 or width2 < width):
+ # non-fatal
+ logging.warning('Source cache layer\'s field #%d ("%s") is too small (width %d < %d)',
+ idx, name2, width2, width)
+ return b
+
+def getSourceCacheLayer(ds : gdal.Dataset, name : str|None,
+ force : bool = False) -> ogr.Layer|None:
+ """Get the source cache layer/table and validate it."""
+ if name is None:
+ return None
+
+ lyr = ds.GetLayerByName(name)
+ if lyr is None:
+ if not force: # show a warning if args.force is not set
+ logging.warning('Table "%s" does not exist, implying --force', name)
+ return None
+
+# if not (lyr.TestCapability(ogr.OLCRandomWrite) and
+# gdalVersionMin(maj=3, min=9) and
+# lyr.TestCapability(ogr.OLCUpdateFeature)):
+# # TODO OLCUpdateFeature was added in 3.7 but we also want to use .GetDataset()
+# # which was added in 3.9
+# logging.warning('Layer "%s" does not support OLCUpdateFeature capability, '
+# 'ignoring cache', name)
+# return None
+
+ defn = lyr.GetLayerDefn()
+ fields = [
+ { 'name': 'source_path', 'typ': ogr.OFTString,
+ 'isNullable': False, 'width': 2047 },
+ { 'name': 'archive_member', 'typ': ogr.OFTString,
+ 'isNullable': False, 'width': 2047 },
+ { 'name': 'layername', 'typ': ogr.OFTString,
+ 'isNullable': False, 'width': 255 },
+ { 'name': 'mtime_ns', 'typ': ogr.OFTInteger64,
+ 'isNullable': True },
+ ]
+ m = len(fields)
+ n = defn.GetFieldCount()
+ if n < m:
+ logging.warning('Source cache layer/table "%s" has %d < %d fields, ignoring cache',
+ name, n, m)
+ elif n != m:
+ logging.warning('Source cache layer/table "%s" has %d != %d fields', name, n, m)
+ if not all(validateSourceCacheField(defn, i, **fld) for i,fld in enumerate(fields)):
+ return None
+
+ n = defn.GetGeomFieldCount()
+ if n > 0:
+ geomFieldNames = [ escape_identifier(defn.GetGeomFieldDefn(i).GetName())
+ for i in range(n) ]
+ logging.warning('Source cache layer/table "%s" has %d > 0 geometry field(s): %s',
+ name, n, ', '.join(geomFieldNames))
+
+ if gdalVersionMin(maj=3, min=5):
+ style = lyr.GetStyleTable()
+ if style is not None:
+ logging.warning('Source cache layer/table "%s" has a style table "%s"',
+ name, style.GetLastStyleName())
+ return lyr
+
+def getSourcesMtimeNS(sources : dict[str,Any],
+ cachedir : Optional[Path] = None) -> dict[str,int|None]:
+ """Return a dictionary mapping each source path to its last modification
+ time (in nanoseconds), or None if said source path is not a regular file."""
+ mtimes_ns = {}
+ for source in sources:
+ # the same source_path can be used multiple times, stat(2) only once
+ source_path = source['source']['path']
+ mtimes_ns[source_path] = None
+ for source_path in mtimes_ns:
+ path = source_path if cachedir is None else str(cachedir.joinpath(source_path))
+ try:
+ st = os.stat(path)
+ if not S_ISREG(st.st_mode):
+ raise FileNotFoundError
+ mtimes_ns[source_path] = st.st_mtime_ns
+ except (OSError, ValueError):
+ #logging.warning('Could not stat(%s)', path)
+ pass
+ return mtimes_ns
+
+def isSourceCacheDirtyTime(lyr : ogr.Layer, sources : dict[str,Any],
+ mtimes_ns : dict[str,int|None]) -> bool:
+ """Return a boolean indicating whether the source cache layer/table is up to
+ date with respect to the dictionary mapping each source path to its last
+ modification time. That is, every triplet (source_path, archive_member,
+ layername) needs to be present in cache, and the corresponding mtime_ns
+ needs to match the stat(2) output of the source file on disk."""
+ ks = set()
+ for source in sources:
+ source_path, archive_member, layername = k = getSourceCacheKey(source)
+ if k in ks:
+ raise BadConfiguration(f'Duplicate key {k}')
+ ks.add(k)
+
+ if len(ks) == 0:
+ return False
+
+ attributeFilter = []
+ for source_path, archive_member, layername in ks:
+ attributeFilter.append('(' + escape_literal_str(source_path) + ','
+ + escape_literal_str(archive_member) + ','
+ + escape_literal_str(layername) + ')')
+ if len(attributeFilter) == 1:
+ attributeFilter = '= ' + attributeFilter[0]
+ else:
+ attributeFilter = 'IN (' + ','.join(attributeFilter) + ')'
+ attributeFilter = '(source_path,archive_member,layername) ' + attributeFilter
+ logging.debug('SetAttributeFilter("%s", "%s")', lyr.GetName(), attributeFilter)
+ lyr.SetAttributeFilter(attributeFilter)
+
+ cache = {}
+ feature = lyr.GetNextFeature()
+ while feature is not None:
+ k = (
+ feature.GetFieldAsString(0) if feature.IsFieldSetAndNotNull(0) else None,
+ feature.GetFieldAsString(1) if feature.IsFieldSetAndNotNull(1) else None,
+ feature.GetFieldAsString(2) if feature.IsFieldSetAndNotNull(2) else None,
+ )
+ if any(k0 is None for k0 in k):
+ # no column in the the key may be NULL
+ raise RuntimeError(f'Bad key {k}')
+ if k in cache:
+ raise RuntimeError(f'Duplicate key {k}')
+ cache[k] = feature.GetFieldAsInteger64(3) if feature.IsFieldSetAndNotNull(3) else None
+ feature = lyr.GetNextFeature()
+
+ for k in ks:
+ a = cache.get(k, None)
+ b = mtimes_ns[k[0]]
+ if a is None or b is None or a != b:
+ return True
+
+ for source_path in sorted({ p for p,_,_ in ks }):
+ # XXX datetime.fromtimestamp() doesn't support nanosecond input
+ # https://github.com/python/cpython/issues/59648
+ mtime = (mtimes_ns[source_path] // 1000)/1000000.
+ dt = datetime.fromtimestamp(mtime)
+ logging.info('Source file %s is unchanged (last modification time %s)',
+ source_path, dt.astimezone().isoformat(timespec='seconds'))
+ return False
+
+def lockSourcePaths(layerdefs : dict[str,Any], lockdir: str) -> dict[str,int]:
+ """Place shared locks on each source path and return their respective file
+ descriptors. We could do that one layerdef at a time (one output layer at a
+ time) to reduce the time during which the sources prevented from being
+ updated/downloaded, but their is some value in having consistency across the
+ whole import process."""
+ lockdir_fd = os.open(lockdir, O_RDONLY|O_CLOEXEC|O_PATH|O_DIRECTORY)
+ try:
+ ret = {}
+ for layerdef in layerdefs:
+ for source in layerdef['sources']:
+ source_path = source['source']['path']
+ lockfile = getSourcePathLockFileName(source_path)
+ lock_fd = os.open(lockfile, O_WRONLY|O_CREAT|O_CLOEXEC, mode=0o664,
+ dir_fd=lockdir_fd)
+ logging.debug('Acquiring shared lock for %s: flock("%s", LOCK_SH)',
+ source_path, lockfile)
+ flock(lock_fd, LOCK_SH)
+ ret[source_path] = lock_fd
+ return ret
+ finally:
+ try:
+ os.close(lockdir_fd)
+ except (OSError, ValueError):
+ logging.exception('Could not close lockdir')
+
+def releaseSourcePathLocks(lock_fds : dict[str,int]) -> None:
+ """Release shared locks on the source paths. Closed FDs are removed from
+ the dictionary in place."""
+ toremove = set()
+ for path, lock_fd in lock_fds.items():
+ try:
+ os.close(lock_fd)
+ except (OSError, ValueError):
+ logging.exception('Could not release lock for %s', path)
+ else:
+ logging.debug('Released lock for %s', path)
+ toremove.add(path)
+ for path in toremove:
+ lock_fds.pop(path)
+
+def commitOrRollbackTransactionIf(obj : gdal.Dataset|ogr.Layer, commit : bool) -> bool:
+ """Either commit or rollback the transaction, depending on a condition
+ indicating whether there are pending new changes to be committed."""
+ if commit:
+ logging.debug('Committing transaction')
+ if obj.CommitTransaction() == ogr.OGRERR_NONE:
+ return True
+ logging.error('Could not commit transaction')
+ else:
+ logging.info('No changes detected, rolling back transaction')
+ # don't call rollbackTransactionTry() as we don't want to catch exceptions
+ if obj.RollbackTransaction() == ogr.OGRERR_NONE:
+ return True
+ logging.error('Could not rollback transaction')
+ return False
+
+def rollbackTransactionTry(obj : gdal.Dataset|ogr.Layer) -> bool:
+ """Try to rollback the current transaction on the dataset or layer, ignoring
+ any exception but logging errors and returning a boolean indicating
+ success."""
+ try:
+ if obj.RollbackTransaction() == ogr.OGRERR_NONE:
+ return True
+ logging.error('Could not rollback transaction')
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not rollback transaction')
+ return False
+
# pylint: disable-next=missing-function-docstring, too-many-branches, too-many-statements
def main() -> NoReturn:
common.init_logger(app=os.path.basename(__file__), level=logging.INFO)
@@ -286,7 +546,11 @@ def main() -> NoReturn:
parser.add_argument('--debug', action='count', default=0,
help=argparse.SUPPRESS)
parser.add_argument('--lockfile', default=None,
- help='obtain an exclusive lock before starting unpacking and importing')
+ help='obtain an exclusive lock before processing')
+ parser.add_argument('--lockdir-sources', default=None,
+ help='optional directory for lock files to sources paths')
+ parser.add_argument('--force', default=False, action='store_true',
+ help='import even if no new changes were detected')
parser.add_argument('groupname', nargs='*', help='group layer name(s) to process')
args = parser.parse_args()
@@ -313,8 +577,8 @@ def main() -> NoReturn:
dso = openOutputDS(config['dataset'])
validate_schema(layers,
- drvo=dso.GetDriver(),
- lco_defaults=config['dataset'].get('create-layer-options', None))
+ drvo=dso.GetDriver(),
+ lco_defaults=config['dataset'].get('create-layer-options', None))
# get configured Spatial Reference System and extent
srs = getSRS(config.get('SRS', None))
@@ -340,6 +604,11 @@ def main() -> NoReturn:
createOutputLayer(dso, layername, srs=srs, options=layerdef.get('create', None))
cachedir = Path(args.cachedir) if args.cachedir is not None else None
+ if args.lockdir_sources is None:
+ sourcePathLocks = None
+ else:
+ sourcePathLocks = lockSourcePaths(layerdefs=layers.values(),
+ lockdir=args.lockdir_sources)
if (dso.TestCapability(ogr.ODsCTransactions) and
dso.GetDriver().ShortName in ('PostgreSQL', 'SQLite', 'GPKG')):
@@ -350,6 +619,12 @@ def main() -> NoReturn:
dso.GetDriver().ShortName)
dsoTransaction = False
+ # get source cache layer/table
+ lyr_sourcecache = getSourceCacheLayer(dso,
+ config['dataset'].get('sourcecache-layername', None),
+ force=args.force)
+
+ bChanged = False
rv = 0
try:
for layername, layerdef in layers.items():
@@ -360,9 +635,18 @@ def main() -> NoReturn:
if not lyr.TestCapability(ogr.OLCSequentialWrite):
raise RuntimeError(f'Output layer "{layername}" has no working '
'CreateFeature() method')
- validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
sources = layerdef['sources']
+ if lyr_sourcecache is None:
+ sourcesMtimeNS = None
+ else:
+ sourcesMtimeNS = getSourcesMtimeNS(sources, cachedir=cachedir)
+ if not (args.force or isSourceCacheDirtyTime(lyr_sourcecache, sources,
+ mtimes_ns=sourcesMtimeNS)):
+ logging.info('Output layer "%s" is up to date, skipping', layername)
+ continue
+
+ validateOutputLayer(lyr, srs=srs, options=layerdef['create'])
# setup output field mapping in the sources dictionary
setOutputFieldMap(lyr.GetLayerDefn(), sources)
@@ -381,36 +665,44 @@ def main() -> NoReturn:
lyrTransaction = False
try:
- clearLayer(dso, lyr)
+ clearLayer(dso, lyr) # TODO conditional (only if not new)?
description = layerdef.get('description', None)
if (description is not None and
lyr.SetMetadataItem('DESCRIPTION', description) != GDAL_CE_None):
logging.warning('Could not set description metadata')
- importSources(lyr, sources=sources, cachedir=cachedir, extent=extent)
+ bChanged0 = importSources(lyr_dst=lyr, dso=dso, sources=sources,
+ mtimes=sourcesMtimeNS,
+ lyr_sourcecache=lyr_sourcecache,
+ cachedir=cachedir, extent=extent)
+ bChanged = bChanged or bChanged0
- if isinstance(lyrTransaction, bool) and lyrTransaction:
- # commit transaction
- logging.debug('Committing transaction')
+ if isinstance(lyrTransaction, str):
+ if not (bChanged0 or args.force):
+ logging.info('No changes detected, rolling back to previous savepoint')
+ query = 'ROLLBACK TO ' + lyrTransaction
+ logging.debug(query)
+ try:
+ dso.ExecuteSQL(query)
+ except Exception: # pylint: disable=broad-exception-caught
+ logging.exception('Could not %s', query)
+ rv = 1
+ elif isinstance(lyrTransaction, bool) and lyrTransaction:
lyrTransaction = False
- if lyr.CommitTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not commit transaction')
+ if not commitOrRollbackTransactionIf(lyr, commit=bChanged0 or args.force):
rv = 1
except Exception: # pylint: disable=broad-exception-caught
if isinstance(lyrTransaction, str):
query = 'ROLLBACK TO ' + lyrTransaction
- logging.exception('Exception occured in transaction, %s', query)
+ logging.exception('Exception occured within transaction, '
+ 'rolling back to previous savepoint')
logging.debug(query)
dso.ExecuteSQL(query)
elif isinstance(lyrTransaction, bool) and lyrTransaction:
- logging.exception('Exception occured in transaction, rolling back')
- try:
- if lyr.RollbackTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not rollback transaction')
- except RuntimeError:
- logging.exception('Could not rollback transaction')
+ logging.exception('Exception occured within transaction, rolling back')
+ rollbackTransactionTry(lyr)
else:
traceback.print_exc()
rv = 1
@@ -422,29 +714,28 @@ def main() -> NoReturn:
logging.debug(query)
dso.ExecuteSQL(query)
+ if sourcePathLocks is not None:
+ releaseSourcePathLocks(sourcePathLocks)
+
if dsoTransaction:
# commit transaction
- logging.debug('Committing transaction')
dsoTransaction = False
- if dso.CommitTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not commit transaction')
+ if not commitOrRollbackTransactionIf(dso, commit=bChanged or args.force):
rv = 1
except Exception: # pylint: disable=broad-exception-caught
if dsoTransaction:
- logging.exception('Exception occured in transaction, rolling back')
- try:
- if dso.RollbackTransaction() != ogr.OGRERR_NONE:
- logging.error('Could not rollback transaction')
- except RuntimeError:
- logging.exception('Could not rollback transaction')
+ logging.exception('Exception occured within transaction, rolling back')
+ rollbackTransactionTry(lyr)
else:
traceback.print_exc()
- rv = 1
+ sys.exit(1)
- dso = None
- srs = None
- extent = None
+ finally:
+ lyr_sourcecache = None
+ dso = None
+ srs = None
+ extent = None
sys.exit(rv)
gdal.UseExceptions()