Skip to content

Instantly share code, notes, and snippets.

@Jerakin
Created April 18, 2023 09:31
Show Gist options
  • Save Jerakin/91f83a93b8914eab782356c62f2ac7cc to your computer and use it in GitHub Desktop.
Save Jerakin/91f83a93b8914eab782356c62f2ac7cc to your computer and use it in GitHub Desktop.
QRunnable Example
import time
from Qt import QtWidgets, QtCore
import runnable
class MainWindow(QtWidgets.QMainWindow):
def __init__(self, *args, **kwargs):
super(MainWindow, self).__init__(*args, **kwargs)
self.worker = None
layout = QtWidgets.QVBoxLayout()
self.label = QtWidgets.QLabel("Start")
self.button = QtWidgets.QPushButton("Run Task")
self.button.pressed.connect(self.run_task)
layout.addWidget(self.label)
layout.addWidget(self.button)
w = QtWidgets.QWidget()
w.setLayout(layout)
self.setCentralWidget(w)
self.show()
self.threadpool = QtCore.QThreadPool()
def update_progress(self, n):
self.label.setText(f"{n:.0%}")
def execute_this_fn(self, progress_callback):
for n in range(0, 5):
time.sleep(1)
progress_callback.emit(n/4)
return "Calculation Done"
def on_success(self, s):
print(s)
def on_finish(self):
self.worker = None
self.button.setEnabled(True)
print("THREAD COMPLETE!")
def run_task(self):
self.button.setEnabled(False)
# Pass the function to execute
self.worker = runnable.Worker(self.execute_this_fn) # Any other args, kwargs are passed to the run function
self.worker.signals.result.connect(self.on_success)
self.worker.signals.finished.connect(self.on_finish)
self.worker.signals.progress.connect(self.update_progress)
# Execute
self.threadpool.start(self.worker)
def closeEvent(self, event):
if self.worker:
self.threadpool.cancel(self.worker)
app = QtWidgets.QApplication([])
window = MainWindow()
app.exec_()
"""
Create a theadpool
self.threadpool = QThreadPool()
Later create a Worker and hook up any signal
worker = Worker(self.execute_this_fn)
worker.signals.result.connect(self.print_output)
worker.signals.finished.connect(self.thread_complete)
worker.signals.progress.connect(self.progress_fn)
self.threadpool.start(worker)
"""
from Qt import QtCore
import time
import traceback, sys
class WorkerSignals(QtCore.QObject):
"""
Defines the signals available from a running worker thread.
finished -> None
Should always fire even if the callable throws an exception
error -> tuple (exctype, value, traceback.format_exc() )
Only fired if any exception is caught
result -> object
If no exception is caught fires your result back to you
progress
int indicating % progress
"""
finished = QtCore.Signal()
error = QtCore.Signal(tuple)
result = QtCore.Signal(object)
progress = QtCore.Signal(float)
class Worker(QtCore.QRunnable):
"""
Worker thread
Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
:param callback: The function callback to run on this worker thread. Supplied args and
kwargs will be passed through to the runner.
:type callback: function
:param args: Arguments to pass to the callback function
:param kwargs: Keywords to pass to the callback function
"""
def __init__(self, fn, *args, **kwargs):
super(Worker, self).__init__()
# Store constructor arguments (re-used for processing)
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
# Add the callback to our kwargs
self.kwargs['progress_callback'] = self.signals.progress
@QtCore.Slot()
def run(self):
"""
Initialise the runner function with passed args, kwargs.
"""
# Retrieve args/kwargs here; and fire processing using them
try:
result = self.fn(*self.args, **self.kwargs)
except:
traceback.print_exc()
exc_type, value = sys.exc_info()[:2]
self.signals.error.emit((exc_type, value, traceback.format_exc()))
else:
self.signals.result.emit(result) # Return the result of the processing
finally:
self.signals.finished.emit() # Done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment