Last active
February 16, 2023 22:47
-
-
Save moosetraveller/6e1a117ee86c0df5f4c53bc6cc920d97 to your computer and use it in GitHub Desktop.
QGIS Processing Tool splitting Polygons along Antimeridian
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
The following QGIS tool corrects polygons that are overlapping | |
the antimeridian by creating multipolygons with two parts on | |
both sides of the antimeridian. | |
Limitations: | |
- The geometry must be WGS 84 (EPSG:4326). | |
- Polygons must not have holes. | |
Dependencies: | |
- QGIS 3.22+ and PyQGIS | |
- shapely | |
:author: | |
Thomas Zuberbuehler | |
""" | |
import re | |
from qgis.PyQt.QtCore import QCoreApplication | |
from qgis.core import ( | |
QgsGeometry, | |
QgsPointXY, | |
QgsVectorLayer, | |
QgsProcessing, | |
QgsProcessingParameterVectorLayer, | |
QgsProcessingParameterString, | |
QgsProcessingAlgorithm, | |
QgsFeature, | |
QgsProcessingParameterFeatureSink, | |
QgsFeatureSink, | |
) | |
from shapely import wkt | |
from shapely.geometry import Polygon, MultiPolygon | |
from qgis import processing | |
from qgis.core.additions.edit import edit | |
from typing import List | |
ANTI_MERIDIAN = QgsGeometry.fromPolylineXY([ | |
QgsPointXY(180, 90), | |
QgsPointXY(180, -90), | |
]).buffer(0.0000001, 0) | |
def _pre_transform(x): | |
if x < 0: | |
return x + 180 + 180 | |
return x | |
def _post_transform(x): | |
if x > 180: | |
return x - 180 - 180 | |
return x | |
def update_geometry(geom: Polygon, transform_x) -> QgsGeometry: | |
exterior = [] | |
for coord in geom.exterior.coords: | |
x, y = transform_x(coord[0]), coord[1] | |
exterior.append((x, y)) | |
interiors = [] | |
for i, interior in enumerate(geom.interiors): | |
interiors.append([]) | |
for coord in interior.coords: | |
x, y = transform_x(coord[0]), coord[1] | |
interiors[i].append((x, y)) | |
return Polygon(exterior, interiors) | |
def load_wkt(wkt_string): | |
try: | |
return wkt.loads(wkt_string) | |
except: | |
match = re.search("Polygon[ ]?[(][(](?P<first_point>[0-9. -e]+?)[,]", wkt_string) | |
first_point = match.group("first_point") | |
wkt_string = f"{wkt_string[:-2]}, {first_point}))" | |
return wkt.loads(wkt_string) | |
def adjust_geometries(geom, feedback): | |
wkt_string = geom.asWkt() # using shapely objects, qgs objects are hell | |
geom = load_wkt(wkt_string) | |
if isinstance(geom, MultiPolygon): | |
polygons = [] | |
for polygon in geom.geoms: | |
updated_polygon = update_geometry(polygon, _pre_transform) | |
polygons.append(updated_polygon) | |
return QgsGeometry.fromWkt(MultiPolygon(polygons).wkt) | |
return QgsGeometry.fromWkt(update_geometry(geom, _pre_transform).wkt) | |
def separate_geometries(geom): | |
wkt_string = geom.asWkt() # using shapely objects, qgs objects are hell | |
geom = load_wkt(wkt_string) | |
if isinstance(geom, MultiPolygon): | |
polygons = [] | |
for polygon in geom.geoms: | |
updated_polygon = update_geometry(polygon, _post_transform) | |
polygons.append(updated_polygon) | |
return QgsGeometry.fromWkt(MultiPolygon(polygons).wkt) | |
return QgsGeometry.fromWkt(geom.wkt) | |
def correct_geometries(layer, control_meridians, context, feedback): | |
meridians = [ | |
QgsGeometry.fromPolylineXY([ | |
QgsPointXY(m, 90), | |
QgsPointXY(m, -90), | |
]) | |
for m in control_meridians | |
] | |
feedback.pushInfo("Create working layer...") | |
result_layer = QgsVectorLayer("Polygon?crs=epsg:4326", "Corrected", "memory") | |
features = [feature for feature in layer.getFeatures()] | |
data_provider = result_layer.dataProvider() | |
attributes = layer.dataProvider().fields().toList() | |
data_provider.addAttributes(attributes) | |
result_layer.updateFields() | |
data_provider.addFeatures(features) | |
feedback.pushInfo("Correct geometries...") | |
modified = [] | |
with edit(result_layer): | |
for feature in result_layer.getFeatures(): | |
geom = feature.geometry() | |
is_candidate = all([ | |
geom.crosses(m) for m in meridians | |
]) | |
if is_candidate: | |
modified.append(feature.id()) | |
geom = adjust_geometries(geom, feedback) | |
result_layer.changeGeometry(feature.id(), geom) | |
feedback.pushInfo("Split geometries...") | |
overlay_layer = QgsVectorLayer("Polygon?crs=epsg:4326", "Overlay", "memory") | |
overlay_data_provider = overlay_layer.dataProvider() | |
overlay_feature = QgsFeature() | |
overlay_feature.setGeometry(ANTI_MERIDIAN) | |
overlay_data_provider.addFeatures([overlay_feature]) | |
overlay_layer.updateExtents() | |
result = processing.run( | |
"native:difference", | |
{ | |
"INPUT": result_layer, | |
"OVERLAY": overlay_layer, | |
"OUTPUT": "memory:splitted" | |
}, | |
context=context, | |
feedback=feedback, | |
) | |
result_layer = result['OUTPUT'] | |
feedback.pushInfo("Separate geometries...") | |
with edit(result_layer): | |
for feature in result_layer.getFeatures(): | |
if feature.id() not in modified: | |
continue | |
geom = feature.geometry() | |
geom = separate_geometries(geom) | |
result_layer.changeGeometry(feature.id(), geom) | |
return result_layer | |
class CalculateDistance(QgsProcessingAlgorithm): | |
LAYER = "LAYER" | |
CONTROL_MERIDIANS = "CONTROL_MERIDIANS" | |
OUTPUT = "OUTPUT" | |
def __init__(self): | |
super().__init__() | |
def tr(self, string): | |
return QCoreApplication.translate("Processing", string) | |
def name(self): | |
return "CorrectGeometriesOverlappingAntiMeridian" | |
def displayName(self): | |
return "Correct Geometries Overlapping Antimeridian" | |
def group(self): | |
return "Geomo Tools" | |
def groupId(self): | |
return "geomo" | |
def createInstance(self): | |
return type(self)() | |
def initAlgorithm(self, config=None): | |
self.addParameter( | |
QgsProcessingParameterVectorLayer( | |
self.LAYER, | |
self.tr("Layer"), | |
[QgsProcessing.TypeVectorPolygon], | |
optional=False, | |
) | |
) | |
self.addParameter( | |
QgsProcessingParameterString( | |
self.CONTROL_MERIDIANS, | |
self.tr("Control Meridians"), | |
defaultValue="-100,0,100", | |
optional=False, | |
) | |
) | |
self.addParameter( | |
QgsProcessingParameterFeatureSink( | |
self.OUTPUT, | |
self.tr("Output Layer"), | |
) | |
) | |
def processAlgorithm(self, parameters, context, feedback): | |
input_layer = self.parameterAsVectorLayer(parameters, self.LAYER, context) | |
control_meridians = [ | |
float(m) for m in self.parameterAsString( | |
parameters, | |
self.CONTROL_MERIDIANS, | |
context | |
).split(",") | |
] | |
result_layer = correct_geometries( | |
input_layer, | |
control_meridians, | |
context, | |
feedback, | |
) | |
fields = input_layer.fields() | |
(sink, sink_destination_id) = self.parameterAsSink( | |
parameters, | |
self.OUTPUT, | |
context, | |
fields, | |
input_layer.wkbType(), | |
input_layer.sourceCrs(), | |
) | |
feedback.pushInfo("Prepare Output ...") | |
for f in result_layer.getFeatures(): | |
feature = QgsFeature() | |
feature.setGeometry(f.geometry()) | |
feature.setAttributes(f.attributes()) | |
sink.addFeature(feature, QgsFeatureSink.FastInsert) | |
return {self.OUTPUT: sink_destination_id} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
See the repository here: