Last active
October 16, 2021 20:21
-
-
Save ghtmtt/3be08a6126bafe100ba859e2396be087 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from PyQt5.QtCore import QCoreApplication, QVariant | |
from qgis.core import (QgsProcessing, | |
QgsFeatureSink, | |
QgsField, | |
QgsFields, | |
QgsProcessingUtils, | |
QgsProcessingException, | |
QgsProcessingAlgorithm, | |
QgsProcessingParameterFeatureSource, | |
QgsProcessingParameterFeatureSink, | |
QgsProcessingParameterField, | |
QgsProcessingParameterRasterLayer, | |
QgsVectorLayer, | |
QgsProcessingParameterNumber, | |
QgsProcessingParameterVectorDestination) | |
import processing | |
import os | |
class Visibility(QgsProcessingAlgorithm): | |
INPUT = 'INPUT' | |
FIELD = 'FIELD' | |
DTM = 'DTM' | |
DISTANCE = 'DISTANCE' | |
OFFSET = 'OFFSET' | |
ID = 'ID' | |
OUTPUT = 'OUTPUT' | |
def tr(self, string): | |
return QCoreApplication.translate('Processing', string) | |
def createInstance(self): | |
return Visibility() | |
def name(self): | |
return 'campo' | |
def displayName(self): | |
return self.tr('Campo') | |
def group(self): | |
return self.tr('Example scripts') | |
def groupId(self): | |
return 'examplescripts' | |
def initAlgorithm(self, config=None): | |
# input POINTS | |
self.addParameter( | |
QgsProcessingParameterFeatureSource( | |
self.INPUT, | |
self.tr('Input layer'), | |
[QgsProcessing.TypeVectorAnyGeometry] | |
) | |
) | |
# Elevation from POINT FIELD | |
self.addParameter( | |
QgsProcessingParameterField( | |
self.FIELD, | |
self.tr("Elevation"), | |
'h', | |
self.INPUT | |
) | |
) | |
# DTM | |
self.addParameter( | |
QgsProcessingParameterRasterLayer( | |
self.DTM, | |
self.tr("DTM") | |
) | |
) | |
# DISTANCE as numerical parameter | |
self.addParameter( | |
QgsProcessingParameterNumber( | |
self.DISTANCE, | |
self.tr("Distance (radius)"), | |
QgsProcessingParameterNumber.Double, | |
-1, | |
False, | |
-1, | |
100000 | |
) | |
) | |
# TARGET ELEVATION as numerical parameter | |
self.addParameter( | |
QgsProcessingParameterNumber( | |
self.OFFSET, | |
self.tr("Target Elevation"), | |
QgsProcessingParameterNumber.Double, | |
0, | |
False, | |
0, | |
100000 | |
) | |
) | |
# ID from POINT Field | |
self.addParameter( | |
QgsProcessingParameterField( | |
self.ID, | |
self.tr("Point ID"), | |
'id', | |
self.INPUT | |
) | |
) | |
# VISIBLE Points as Multipolygon | |
self.addParameter( | |
QgsProcessingParameterVectorDestination( | |
self.OUTPUT, | |
self.tr('Visible Points') | |
) | |
) | |
def processAlgorithm(self, parameters, context, feedback): | |
""" | |
Here is where the processing itself takes place. | |
""" | |
# Dummy function for thread safe mode | |
def dummy(alg, context, feedback): | |
pass | |
# Input POINTS as source | |
source = self.parameterAsVectorLayer( | |
parameters, | |
self.INPUT, | |
context | |
) | |
if source is None: | |
raise QgsProcessingException(self.invalidSourceError(parameters, self.INPUT)) | |
# Points ELEVATION from FIELD | |
elevation = self.parameterAsFields( | |
parameters, | |
self.FIELD, | |
context | |
)[0] | |
# RASTER as dtm | |
dtm = self.parameterAsRasterLayer( | |
parameters, | |
self.DTM, | |
context | |
) | |
# DISTANCE radius as numerical parameter | |
distance = self.parameterAsDouble( | |
parameters, | |
self.DISTANCE, | |
context | |
) | |
# OFFSET target elevation as numerical parameter | |
offset = self.parameterAsDouble( | |
parameters, | |
self.OFFSET, | |
context | |
) | |
# Point ID from FIELD | |
point_id = self.parameterAsFields( | |
parameters, | |
self.ID, | |
context | |
)[0] | |
# Final OUTPUT Multipolygon | |
final_output = self.parameterAsOutputLayer( | |
parameters, | |
self.OUTPUT, | |
context | |
) | |
features = source.getFeatures() | |
for j, i in enumerate(features): | |
# Starting GRASS viewshed | |
# define output path fro GRASS temp file | |
view_path = os.path.join( | |
QgsProcessingUtils.tempFolder(), | |
'viewshed_{}.tif'.format(i[point_id]) | |
) | |
feedback.pushDebugInfo(str(view_path)) | |
geom = i.geometry().asPoint() | |
grass_viewshed = processing.run("grass7:r.viewshed", | |
{ | |
'input':dtm.source(), | |
# coordinates from every feature | |
'coordinates': '{},{}'.format(geom.x(), geom.y()), | |
# elevation from point FIELD | |
'observer_elevation': i[elevation], | |
# OFFSET elevation from parameter | |
'target_elevation': offset, | |
# ELEVATION/RADIUS from parameter | |
'max_distance': distance, | |
'refraction_coeff':0.14286, | |
'memory':2000, | |
'-c':True, | |
'-r':True, | |
'-b':True, | |
'-e':False, | |
'GRASS_REGION_PARAMETER':None, | |
'GRASS_REGION_CELLSIZE_PARAMETER':0, | |
'GRASS_RASTER_FORMAT_OPT': '', | |
'GRASS_RASTER_FORMAT_META': '', | |
'output': view_path | |
}, context=context, feedback=feedback, onFinish=dummy)['output'] | |
# GDAL polygonize | |
gdal_polygonize = os.path.join( | |
QgsProcessingUtils.tempFolder(), | |
'polygonize_{}.shp'.format(i[point_id]) | |
) | |
gdal_polygonize = processing.run("gdal:polygonize", | |
{ | |
'INPUT': grass_viewshed, | |
'BAND': 1, | |
'FIELD': 'DN', | |
'EIGHT_CONNECTEDNESS':False, | |
'OUTPUT': gdal_polygonize | |
}, context=context, feedback=feedback, onFinish = dummy)['OUTPUT'] | |
# Extraction of VALUES = 1 from polygnized output | |
qgis_extraction = processing.run("native:extractbyattribute", | |
{ | |
'INPUT': gdal_polygonize, | |
'FIELD': 'DN', | |
'OPERATOR': 0, | |
'VALUE': 1, | |
'OUTPUT': 'memory:' | |
}, context=context, feedback=feedback, onFinish=dummy)['OUTPUT'] | |
# Dissolve geometries | |
qgis_dissolved = processing.run("native:dissolve", | |
{ | |
'INPUT': qgis_extraction, | |
'FIELD': ['DN'], | |
'OUTPUT': parameters[self.OUTPUT] | |
}, context=context, feedback=feedback, onFinish=dummy)['OUTPUT'] | |
# From here add fields to the final vector | |
# get output as QgsVectorLayer | |
layer = QgsVectorLayer(qgis_dissolved, 'temp', 'ogr') | |
# get dataProvider | |
pr = layer.dataProvider() | |
# create QgsFields | |
fields = QgsFields() | |
fields.append(QgsField('elevation', QVariant.Double)) | |
fields.append(QgsField('distance', QVariant.Double)) | |
fields.append(QgsField('offset', QVariant.Double)) | |
fields.append(QgsField('id', QVariant.Double)) | |
# add QgsFields with the dataProvider | |
pr.addAttributes(fields) | |
layer.updateFields() | |
# lookup to get idx of Fields | |
field1 = layer.fields().lookupField("elevation") | |
field2 = layer.fields().lookupField("distance") | |
field3 = layer.fields().lookupField("offset") | |
field4 = layer.fields().lookupField("id") | |
# add values to each Field | |
attr = { | |
field1 : i[elevation], | |
field2 : distance, | |
field3 : offset, | |
field4: i[point_id] | |
} | |
# commit the changes with the dataProvider | |
changes = {0: attr} | |
pr.changeAttributeValues(changes) | |
return {self.OUTPUT: qgis_dissolved} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment