Skip to content

Instantly share code, notes, and snippets.

@t-book
Created June 25, 2024 15:10
Show Gist options
  • Save t-book/c8b4ff98cf4533cd2b7f72d39c9c24fb to your computer and use it in GitHub Desktop.
Save t-book/c8b4ff98cf4533cd2b7f72d39c9c24fb to your computer and use it in GitHub Desktop.
import os
import subprocess
import platform
import re
import threading
from qgis.PyQt import QtWidgets, uic, QtCore
from qgis.core import QgsProject, QgsVectorLayer, QgsFeature, QgsField, QgsMessageLog, Qgis, QgsApplication, QgsWkbTypes
from qgis.utils import iface
FORM_CLASS, _ = uic.loadUiType(os.path.join(
os.path.dirname(__file__), 'survey_2_gis_demo_dockwidget_base.ui'))
class Survey2GisDemoDockWidget(QtWidgets.QDockWidget, FORM_CLASS):
closingPlugin = QtCore.pyqtSignal()
def __init__(self, parent=None):
"""Constructor."""
super(Survey2GisDemoDockWidget, self).__init__(parent)
# Set up the user interface from Designer.
self.setupUi(self)
# Connect buttons to their slots
self.input_select_button.clicked.connect(self.select_input_files)
self.parser_select_button.clicked.connect(self.select_parser_file)
self.process_button.clicked.connect(self.process_files)
self.input_data_reset.clicked.connect(lambda: self.reset_text_field(self.input_select))
self.parser_reset.clicked.connect(lambda: self.reset_text_field(self.parser_select))
# Create a button group for the radio buttons
self.label_mode_group = QtWidgets.QButtonGroup(self)
self.label_mode_group.addButton(self.label_mode_center)
self.label_mode_group.addButton(self.label_mode_first)
self.label_mode_group.addButton(self.label_mode_last)
# Ensure the Linux binary is executable
self.ensure_linux_binary_executable()
def closeEvent(self, event):
self.closingPlugin.emit()
event.accept()
def ensure_linux_binary_executable(self):
"""Ensure the Linux binary has executable permissions."""
if platform.system().lower() == 'linux':
binary_path = os.path.join(os.path.dirname(__file__), 'bin', 'linux64', 'cli-only', 'survey2gis')
if os.path.exists(binary_path):
st = os.stat(binary_path)
os.chmod(binary_path, st.st_mode | 0o111)
def select_input_files(self):
"""Open file dialog to select multiple input files."""
files, _ = QtWidgets.QFileDialog.getOpenFileNames(self, "Select Input File(s)", "", "All Files (*)")
if files:
self.input_select.setText("; ".join(files))
def select_parser_file(self):
"""Open file dialog to select a parser file."""
file, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Select Parser File", "", "All Files (*)")
if file:
self.parser_select.setText(file)
def reset_text_field(self, field):
field.setText("")
def get_selected_label_mode(self):
"""Get the selected label mode from the radio button group."""
selected_button = self.label_mode_group.checkedButton()
if selected_button:
return selected_button.text()
return None
def show_success_message(self, message):
"""Display a success message using QgsMessageBar."""
message_bar = iface.messageBar()
message_bar.pushMessage("Success", message, level=Qgis.Info, duration=5)
def show_error_message(self, message):
"""Display an error message using QgsMessageBar."""
message_bar = iface.messageBar()
message_bar.pushMessage("Error", message, level=Qgis.Critical, duration=5)
def process_files(self):
"""Process the files using the survey2gis command line tool."""
# Get the user inputs
parser_path = self.parser_select.text()
input_files = self.input_select.text().split("; ")
output_base_name = self.output_base_input.text()
label_mode = self.get_selected_label_mode()
# Determine the binary path based on the OS
system = platform.system().lower()
if system == 'windows':
binary_path = os.path.join(os.path.dirname(__file__), 'bin', 'win32', 'cli-only', 'survey2gis.exe')
elif system == 'linux':
binary_path = os.path.join(os.path.dirname(__file__), 'bin', 'linux64', 'cli-only', 'survey2gis')
else:
self.show_error_message("Your operating system is not supported.")
return
# Build the output directory path
output_dir = os.path.join(os.path.dirname(__file__), 'output')
# Create the output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Build the command
command = [
binary_path,
'-p', parser_path,
'--label-mode-line=' + label_mode,
'-o', output_dir,
'-n', output_base_name
] + input_files
self.output_log.clear()
self.output_log.append(' '.join(command))
self.process_thread = threading.Thread(target=self.run_process, args=(command,))
self.process_thread.start()
# Start a QTimer to set a timeout
self.timer = QtCore.QTimer()
self.timer.setSingleShot(True)
self.timer.timeout.connect(self.handle_timeout)
self.timer.start(60000) # 60 seconds timeout
def run_process(self, command):
try:
# Execute the command and capture the output
self.process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = self.process.communicate()
# Merge stdout and stderr
output_log = (stdout + stderr).decode('utf-8')
# Write the output to the log
self.output_log.append(output_log)
# Check for errors in the output
if 'ERROR' in output_log:
self.show_error_message("Error occurred during processing.")
return
# Use regex to check for success message
success_pattern = r"Output files produced"
if re.search(success_pattern, output_log):
self.show_success_message("Output files successfully produced.")
self.output_log.append("Output files successfully produced.")
# Search for the shapefiles based on the output_base_name
shapefile_suffixes = ['_poly.shp','_line.shp', '_point.shp']
shapefiles = [os.path.join(output_dir, output_base_name + suffix) for suffix in shapefile_suffixes]
# Load the shapefiles into memory layers
memory_layers = []
for shapefile in shapefiles:
self.output_log.append("Processing: " + shapefile)
if os.path.exists(shapefile):
memory_layer = self.load_shapefile_to_memory_layer(shapefile)
if memory_layer and memory_layer.isValid():
self.output_log.append("Memory Layer is valid")
memory_layers.append(memory_layer)
else:
self.output_log.append("Memory Layer failed")
QgsMessageLog.logMessage(f"Failed to create memory layer for: {os.path.basename(shapefile)}", 'Plugin')
else:
self.output_log.append(f"Shapefile not found: {shapefile}")
# Add the memory layers to the project
for layer in memory_layers:
QgsProject.instance().addMapLayer(layer)
self.output_log.append(f"Added {layer} to project")
# Allow some time for the layer to be added
QgsApplication.processEvents()
# Delete output files after all layers are added
self.delete_output_files(output_dir, output_base_name)
except Exception as e:
self.output_log.append(f"An error occurred: {e}")
QgsMessageLog.logMessage(f"An error occurred: {e}", 'Plugin')
self.show_error_message(f"An error occurred: {e}")
finally:
# Stop the timer if the process finishes before timeout
self.timer.stop()
def handle_timeout(self):
"""Handle the timeout event."""
if self.process and self.process.poll() is None:
self.process.terminate()
self.show_error_message("The process was terminated due to timeout.")
self.output_log.append("The process was terminated due to timeout.")
def delete_output_files(self, directory, current_base_name):
"""Delete all files in the given directory except those related to the current base name."""
try:
# Log before attempting to delete files
QgsMessageLog.logMessage(f"Deleting files in '{directory}' except those related to '{current_base_name}'...")
self.output_log.append(f"Deleting files in '{directory}' except those related to '{current_base_name}'...")
for root, dirs, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
if current_base_name not in file:
try:
os.remove(file_path)
QgsMessageLog.logMessage(f"Deleted file: {file_path}")
self.output_log.append(f"Deleted file: {file_path}")
except Exception as e:
QgsMessageLog.logMessage(f"Failed to delete file: {file_path}. Error: {e}", 'Plugin')
self.output_log.append(f"Failed to delete file: {file_path}. Error: {e}")
for dir in dirs:
dir_path = os.path.join(root, dir)
if current_base_name not in dir:
try:
os.rmdir(dir_path)
QgsMessageLog.logMessage(f"Deleted directory: {dir_path}")
self.output_log.append(f"Deleted directory: {dir_path}")
except Exception as e:
QgsMessageLog.logMessage(f"Failed to delete directory: {dir_path}. Error: {e}", 'Plugin')
self.output_log.append(f"Failed to delete directory: {dir_path}. Error: {e}")
QgsMessageLog.logMessage(f"All files inside the output directory '{directory}' except those related to '{current_base_name}' have been deleted.", 'Plugin')
self.output_log.append(f"All files inside the output directory '{directory}' except those related to '{current_base_name}' have been deleted.")
except Exception as e:
QgsMessageLog.logMessage(f"Failed to delete files in '{directory}'. Error: {e}", 'Plugin')
self.output_log.append(f"Failed to delete files in '{directory}'. Error: {e}")
def load_shapefile_to_memory_layer(self, shapefile_path):
"""Load a shapefile into a memory layer."""
shapefile_layer = QgsVectorLayer(shapefile_path, 'Shapefile Layer', 'ogr')
if not shapefile_layer.isValid():
QgsMessageLog.logMessage(f"Failed to load the shapefile: {shapefile_path}", 'Plugin')
self.output_log.append(f"2 Failed to load the shapefile: {shapefile_path}")
return None
# Get CRS, fields, and features from the shapefile
crs = shapefile_layer.crs()
fields = shapefile_layer.fields()
features = list(shapefile_layer.getFeatures())
# Create a new memory layer with the same CRS and fields
geometry_type = QgsWkbTypes.displayString(shapefile_layer.wkbType())
memory_layer = QgsVectorLayer(f"{geometry_type}?crs={crs.authid()}", os.path.basename(shapefile_path), 'memory')
memory_layer.dataProvider().addAttributes(fields)
memory_layer.updateFields()
# Add features from the shapefile to the memory layer
memory_layer.startEditing()
memory_layer.dataProvider().addFeatures(features)
memory_layer.commitChanges()
return memory_layer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment