Skip to content

Instantly share code, notes, and snippets.

@arbakker
Last active August 12, 2023 15:22
Show Gist options
  • Save arbakker/a773f79586c47d7f2eb9f10c98134e5e to your computer and use it in GitHub Desktop.
Save arbakker/a773f79586c47d7f2eb9f10c98134e5e to your computer and use it in GitHub Desktop.
QGIS processing tool for joining PDOK AHN3 WCS with point data

PDOK AHN3 WCS QGIS Processing Tool

QGIS processing tools for joining PDOK AHN3 WCS (elevation) data with point data.

Tested with QGIS version 3.18, will most likely work with all QGIS version 3.X.

Installation

  1. Download pdok-ahn3-wcs-tool.py
  2. Open the QGIS Processing Toolbox and click Add Script to Toolbox... and browse to the downloaded pdok-ahn3-wcs-tool.py file:

  1. Now the PDOK AHN3 WCS Tool tool should be available in your toolbox:

  1. Start using the PDOK AHN3 WCS Tool:

⚠️ Note that for each feature 1 HTTP request is issue to the WCS service. For large point datasets it will be more efficient to download the full coverage for your area of interest instead and perform the join with other tooling.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""pdok-ahn3-wcs-tool.py: QGIS Processing tool for joining elevation to point data \
using the PDOK AHN3 WCS. Tested with QGIS version 3.18, but will probably work with any \
QGIS 3.X version."""
# MIT License
# Copyright (c) 2021 Anton Bakker
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__author__ = "Anton Bakker"
__copyright__ = "Copyright 2021, Anton Bakker"
__license__ = "MIT"
__version__ = "1.0.0"
__maintainer__ = "Anton Bakker"
__email__ = "anton.bakker@kadaster.nl"
__date__ = "2021-03-14"
import os
import uuid
import re
import struct
import traceback
import tempfile
import requests
from math import floor
import email.parser
from osgeo import gdal, ogr
from requests.structures import CaseInsensitiveDict
from owslib.wcs import WebCoverageService
from qgis.PyQt.QtCore import QCoreApplication, QVariant
from qgis.core import (
QgsProject,
QgsProcessing,
QgsField,
QgsGeometry,
QgsPointXY,
QgsWkbTypes,
QgsFeature,
QgsUnitTypes,
QgsFeatureSink,
QgsCoordinateReferenceSystem,
QgsCoordinateTransform,
QgsProcessingAlgorithm,
QgsProcessingException,
QgsProcessingParameterDistance,
QgsProcessingParameterCrs,
QgsProcessingParameterFeatureSource,
QgsProcessingParameterEnum,
QgsProcessingParameterBoolean,
QgsProcessingParameterString,
QgsProcessingParameterNumber,
QgsProcessingParameterFeatureSink,
QgsProcessingParameterField,
)
from qgis import processing
def get_boundary(response):
pattern = b"^\r\n(--.*)\r\n"
m = re.search(pattern, response)
if m:
return m.group(1)
return ""
def split_on_find(content, bound):
point = content.find(bound)
return content[:point], content[point + len(bound) :]
def encode_with(string, encoding):
if not (string is None or isinstance(string, bytes)):
return string.encode(encoding)
return string
def header_parser(string, encoding):
string = string.decode(encoding)
headers = email.parser.HeaderParser().parsestr(string).items()
return ((encode_with(k, encoding), encode_with(v, encoding)) for k, v in headers)
def parse_response(content):
encoding = "utf-8"
sep = get_boundary(content)
parts = content.split(b"".join((b"\r\n", sep)))
parts = parts[1:-1]
result = []
for part in parts:
if b"\r\n\r\n" in part:
first, body = split_on_find(part, b"\r\n\r\n")
headers = header_parser(first.lstrip(), encoding)
headers = CaseInsensitiveDict(headers)
item = {}
item["headers"] = headers
item["content"] = body
result.append(item)
return result
class PDOKWCSTool(QgsProcessingAlgorithm):
""""""
USER_AGENT_HEADER = {"User-Agent": "qgis-pdok-processing-tools"}
def tr(self, string):
"""
Returns a translatable string with the self.tr() function.
"""
return QCoreApplication.translate("Processing", string)
def createInstance(self):
# Must return a new copy of your algorithm.
return PDOKWCSTool()
def name(self):
"""
Returns the unique algorithm name.
"""
return "pdok-ahn3-wcs-tool"
def displayName(self):
"""
Returns the translated algorithm name.
"""
return self.tr("PDOK AHN3 WCS Tool")
def group(self):
"""
Returns the name of the group this algorithm belongs to.
"""
return self.tr("PDOK Tools")
def groupId(self):
"""
Returns the unique ID of the group this algorithm belongs
to.
"""
return "pdok-tools"
def shortHelpString(self):
"""
Returns a localised short help string for the algorithm.
"""
return self.tr(
'This processing tool retrieves elevation data from the AHN3 WCS \
for each point in the point input layer. The output is a point layer with \
the joined elevation attribute. \
Parameters:\n\n\
- Input point layer\n\
- CoverageId: type of coverage to query, see the <a href="https://www.ahn.nl/kwaliteitsbeschrijving">\
AHN documentation</a>\n\
- Attribute name: name of attribution to store elevation data in\n\
- Target CRS: CRS of the resulting output layer\n\
- Output layer: resulting output layer\n\
'
)
def initAlgorithm(self, config=None):
"""
Here we define the inputs and outputs of the algorithm.
"""
self.wcs_url = "https://geodata.nationaalgeoregister.nl/ahn3/wcs"
self.wcs = WebCoverageService(self.wcs_url, version="2.0.1")
_coverages = list(self.wcs.contents.keys())
self.coverages = [(item, self.tr(item)) for item in _coverages]
self.INPUT = "INPUT" # recommended name for the main input parameter
self.OUTPUT = "OUTPUT" # recommended name for the main output parameter
self.TARGET_CRS = "TARGET_CRS"
self.ATTRIBUTE_NAME = "ATTRIBUTE_NAME"
self.COVERAGE_ID = "COVERAGE_ID"
self.addParameter(
QgsProcessingParameterFeatureSource(
self.INPUT,
self.tr("Input point layer"),
types=[QgsProcessing.TypeVectorPoint],
)
)
self.addParameter(
QgsProcessingParameterEnum(
self.COVERAGE_ID,
self.tr("CoverageId"),
options=[p[1] for p in self.coverages],
defaultValue=0,
optional=False,
)
)
self.addParameter(
QgsProcessingParameterString(
self.ATTRIBUTE_NAME,
self.tr("Attribute name"),
defaultValue="elevation",
optional=True,
)
),
self.addParameter(
QgsProcessingParameterFeatureSink(self.OUTPUT, self.tr("Output layer"))
)
self.addParameter(
QgsProcessingParameterCrs(
self.TARGET_CRS, self.tr("Target CRS"), "EPSG:4326"
)
)
def get_ahn_val(self, x, y, coverage_id, feedback):
origin = [float(i) for i in self.wcs.contents[coverage_id].grid.origin]
cell_size = float(self.wcs.contents[coverage_id].grid.offsetvectors[0][0])
x_lower_bound = origin[0] + (((x - origin[0]) // cell_size) * cell_size)
x_upper_bound = x_lower_bound + (2 * cell_size)
y_lower_bound = origin[1] + (((y - origin[1]) // cell_size) * cell_size)
y_upper_bound = y_lower_bound + (2 * cell_size)
url = f"{self.wcs_url}?service=WCS&Request=GetCoverage&version=2.0.1&CoverageId={coverage_id}&format=image/tiff&subset=x({x_lower_bound},{x_upper_bound})&subset=y({y_lower_bound},{y_upper_bound})"
feedback.pushInfo(f"url: {url}")
response = requests.get(url)
multipart_data = parse_response(response.content)
for part in multipart_data:
if part["headers"][b"content-type"] == b"image/tiff":
coverage = part["content"]
uuid_string = str(uuid.uuid4())
tif_file_name = f"/vsimem/{uuid_string}.tif"
gdal.UseExceptions()
gdal.FileFromMemBuffer(tif_file_name, coverage)
ds = gdal.Open(tif_file_name)
band = ds.GetRasterBand(1)
gt = ds.GetGeoTransform()
px = floor((x - gt[0]) / gt[1])
py = floor((y - gt[3]) / gt[5])
structval = band.ReadRaster(px, py, 1, 1, buf_type=gdal.GDT_Float32)
floatval = struct.unpack("f", structval)
return floatval[0]
def processAlgorithm(self, parameters, context, feedback):
try:
# read out parameters
input_layer = self.parameterAsVectorLayer(parameters, self.INPUT, context)
out_crs = parameters[self.TARGET_CRS]
attribute_name = parameters[self.ATTRIBUTE_NAME]
coverage_id = [
self.coverages[i][0]
for i in self.parameterAsEnums(parameters, self.COVERAGE_ID, context)
][0]
# start processing
fields = input_layer.fields()
fields.append(QgsField(attribute_name, QVariant.Double))
field_names = [field.name() for field in fields]
(sink, dest_id) = self.parameterAsSink(
parameters,
self.OUTPUT,
context,
fields,
input_layer.wkbType(),
out_crs,
)
if feedback.isCanceled():
return {}
crs_uri = self.wcs.contents[coverage_id].boundingboxes[0]["nativeSrs"]
feedback.pushInfo(f"crs_uri: {crs_uri}")
in_crs = input_layer.crs()
if in_crs.authid() != "EPSG:28992":
rd_crs = QgsCoordinateReferenceSystem("EPSG:28992")
transform_input = QgsCoordinateTransform(
in_crs, rd_crs, QgsProject.instance()
)
for feature in input_layer.getFeatures():
geom = feature.geometry()
if in_crs.authid() != "EPSG:28992":
geom.transform(transform_input)
point_geom = QgsGeometry.asPoint(geom)
point_xy = QgsPointXY(point_geom)
x = point_xy.x()
y = point_xy.y()
attrs = feature.attributes()
new_ft = QgsFeature(fields)
for i in range(len(attrs)):
attr = attrs[i]
field_name = field_names[i]
new_ft.setAttribute(field_name, attr)
ahn_val = self.get_ahn_val(x, y, coverage_id, feedback)
# TODO: retrieve NODATA val from WCS service
if ahn_val == 3.4028234663852886e38:
ahn_val = None
new_ft.setAttribute(attribute_name, ahn_val)
if out_crs.authid() != in_crs.authid():
transform = QgsCoordinateTransform(
in_crs, out_crs, QgsProject.instance()
)
geom.transform(transform)
new_ft.setGeometry(geom)
sink.addFeature(new_ft, QgsFeatureSink.FastInsert)
if feedback.isCanceled():
return {}
results = {}
results[self.OUTPUT] = dest_id
return results
except Exception as e:
toolname = type(self).__name__
raise QgsProcessingException(
f"Unexpected error occured while running {toolname}: {e}"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment