Skip to content

Instantly share code, notes, and snippets.

@jfbourdon
Created May 17, 2022 18:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jfbourdon/e0010a15e829ec61d21106bfabd38bf8 to your computer and use it in GitHub Desktop.
Save jfbourdon/e0010a15e829ec61d21106bfabd38bf8 to your computer and use it in GitHub Desktop.
Establish stream priority in network from line direction
from qgis.PyQt.QtCore import QCoreApplication
from qgis.core import *
from qgis.utils import pluginMetadata
from PyQt5.QtCore import *
import processing
from processing.core.Processing import Processing
from qgis.analysis import QgsNativeAlgorithms
import os
from collections import deque
from math import sqrt
# valyer_streams must be a QgsVectorLayer
# tolerance is defined in distance unit
def stream_priority(vlayer_streams, tolerance=0.001):
# Check for an input of LINESTRING type only
if vlayer_streams.wkbType() not in [2, 1002, 2002, 3002]:
print("Only LINESTRING are accepted.")
quit()
# Add a unique FID in a new field
# The null reprojection is only to create a temporary layer and
# to make sure that the FIDs start at 1
new_fid_field="FID_ori"
vlayer_priority = processing.run("native:reprojectlayer", {
'INPUT':vlayer_streams,
'TARGET_CRS':vlayer_streams.crs(),
'OUTPUT':'TEMPORARY_OUTPUT'
})["OUTPUT"]
with edit(vlayer_priority):
vlayer_priority.addAttribute(QgsField(new_fid_field, QVariant.Int))
idx_fid = vlayer_priority.fields().indexFromName(new_fid_field)
for feature in vlayer_priority.getFeatures():
vlayer_priority.changeAttributeValue(feature.id(), idx_fid, feature.id())
# Extract separately starting and ending vertices from each segment
vlayer_vertices_start = processing.run("native:extractspecificvertices", {
'INPUT':vlayer_priority,
'VERTICES':'0',
'OUTPUT':'TEMPORARY_OUTPUT'
})["OUTPUT"]
vlayer_vertices_end = processing.run("native:extractspecificvertices", {
'INPUT':vlayer_priority,
'VERTICES':'-1',
'OUTPUT':'TEMPORARY_OUTPUT'
})["OUTPUT"]
# Prepare core objects for the work to come
# Dictionaries, set, deque and indexes are used for maximum performance
request = QgsFeatureRequest().setSubsetOfAttributes([new_fid_field], vlayer_vertices_start.fields())
dict_starts = {feature[new_fid_field]:[feature.geometry().constGet(), 0] for feature in vlayer_vertices_start.getFeatures(request)}
dict_ends = {feature[new_fid_field]:[feature.geometry().constGet(), 0] for feature in vlayer_vertices_end.getFeatures(request)}
index_start = QgsSpatialIndex(vlayer_vertices_start)
index_end = QgsSpatialIndex(vlayer_vertices_end)
remaining_fids = set(dict_starts.keys())
deck = deque(maxlen=len(dict_starts))
# Check if no pair of endpoints are closer than the tolerance value
tolerance_failed = False
for key, pt_start in dict_starts.items():
pt_end = dict_ends[key][0]
dist = sqrt((pt_start[0].x() - pt_end.x())**2
+ (pt_start[0].y() - pt_end.y())**2)
if dist <= tolerance:
tolerance_failed = True
print(f"Stream located around x:{round(pt_start[0].x(), 1)} y:{round(pt_start[0].y(), 1)} has its ends closer than the tolerance value ({tolerance}).")
if tolerance_failed:
quit()
# Main loop executed as long as some FIDs haven't been met
while remaining_fids:
# Start the search with a random available FID
upstream_fid = remaining_fids.pop()
# Going downstream until there is nowhere to go
while not len(deck):
# Find immediate downstreams FIDs
end_coord = dict_ends[upstream_fid][0]
end_coord_x = end_coord.x()
end_coord_y = end_coord.y()
rectangle = QgsRectangle(
end_coord_x - tolerance,
end_coord_y - tolerance,
end_coord_x + tolerance,
end_coord_y + tolerance
)
downstream_fids = index_start.intersects(rectangle)
# Get the first FID found as a seed to continue going downstream
# The hypothesis here is that a stream network is always converging
# and never diverging
if len(downstream_fids) > 0:
upstream_fid = downstream_fids[0]
else:
deck.append(upstream_fid)
# Going upstream until there is nowhere to go
while len(deck):
downstream_fid = deck.popleft()
remaining_fids.discard(downstream_fid)
# Increment priority based on the common downstream FID
priority = dict_starts[downstream_fid][1] + 1
# Find immediate upstreams FIDs
start_coord = dict_starts[downstream_fid][0]
start_coord_x = start_coord.x()
start_coord_y = start_coord.y()
rectangle = QgsRectangle(
start_coord_x - tolerance,
start_coord_y - tolerance,
start_coord_x + tolerance,
start_coord_y + tolerance
)
upstream_fids = index_end.intersects(rectangle)
# Add the upstream FIDs found to the deck and
# update their priority value
for upstream_fid in upstream_fids:
dict_starts[upstream_fid][1] = priority
deck.append(upstream_fid)
# Add two priority fields to a copy of the input "vlayer_streams" QgsVectorLayer
priority_max = max([value[1] for key, value in dict_starts.items()])
priority_rev = list(range(priority_max, -1, -1))
with edit(vlayer_priority):
vlayer_priority.addAttribute(QgsField("PRIORITY_UPSTREAM", QVariant.Int))
vlayer_priority.addAttribute(QgsField("PRIORITY_DOWNSTREAM", QVariant.Int))
idx_up = vlayer_priority.fields().indexFromName("PRIORITY_UPSTREAM")
idx_down = vlayer_priority.fields().indexFromName("PRIORITY_DOWNSTREAM")
idx_fid = vlayer_priority.fields().indexFromName(new_fid_field)
for feature in vlayer_priority.getFeatures():
priority_up = dict_starts[feature[new_fid_field]][1]
vlayer_priority.changeAttributeValue(feature.id(), idx_up, priority_up)
vlayer_priority.changeAttributeValue(feature.id(), idx_down, priority_rev[priority_up])
vlayer_priority.deleteAttribute(idx_fid)
return(vlayer_priority)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment