Skip to content

Instantly share code, notes, and snippets.

@relent95
Created June 19, 2023 09:24
Show Gist options
  • Save relent95/f05043544c79ac7e6c97972519396196 to your computer and use it in GitHub Desktop.
Save relent95/f05043544c79ac7e6c97972519396196 to your computer and use it in GitHub Desktop.
test_thread_pool.py
# This is a correction of the test code by Teun.
import sys
from datetime import datetime
import time
from PyQt6.QtCore import QThreadPool, QRunnable, QObject, pyqtSignal, pyqtSlot
from PyQt6.QtWidgets import QMainWindow, QPushButton, QApplication
class WorkerSignals(QObject):
"""Defines the signals available from a running worker thread.
Supported signals are:
- started: no data
- finished: no data
"""
started = pyqtSignal(object)
finished = pyqtSignal(object)
class Worker(QRunnable):
"""Worker thread"""
def __init__(self, fun, *args, **kwargs):
"""Initialize method of the Worker thread"""
super(Worker, self).__init__()
# Store constructor arguments (re-used for processing)
self.fun = fun
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
@pyqtSlot()
def run(self):
"""Execute the function with passed args and kwargs"""
self.signals.started.emit(self)
self.fun(*self.args, **self.kwargs)
self.signals.finished.emit(self)
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.button = QPushButton('Start', self)
self.button.setGeometry(10, 10, 100, 30)
self.button.clicked.connect(self.start)
self.sequential_thread_pool = QThreadPool()
self.sequential_thread_pool.setMaxThreadCount(1)
self.concurrent_thread_pool = QThreadPool()
def start(self):
self.start_worker(self.step1, 'sequential')
self.start_worker(self.step2, 'sequential')
def on_worker_started(self, worker):
print(f'[{datetime.now()}] Started {worker.fun.__name__}')
def on_worker_finished(self, worker):
print(f'[{datetime.now()}] Finished {worker.fun.__name__}')
def start_worker(self, fun, thread_pool):
worker = Worker(fun)
worker.signals.started.connect(self.on_worker_started)
worker.signals.finished.connect(self.on_worker_finished)
if thread_pool == 'sequential':
self.sequential_thread_pool.start(worker)
elif thread_pool == 'concurrent':
self.concurrent_thread_pool.start(worker)
def step1(self):
# this is a step that needs to execute first
time.sleep(1) # do stuff
print(f'[{datetime.now()}] Executing step1')
time.sleep(1) # do stuff
def requirement_a(self):
# this is a step that needs to execute after step1 finished, but can happen concurrently with requirement_b
time.sleep(1) # do stuff
print(f'[{datetime.now()}] Executing requirement_a')
time.sleep(1) # do stuff
def requirement_b(self):
# this is a step that needs to execute after step1 finished, but can happen concurrently with requirement_a
time.sleep(1) # do stuff
print(f'[{datetime.now()}] Executing requirement_b')
time.sleep(1) # do stuff
def step2(self):
# this is a step that needs to execute after step1, requirement_a and requirement_b finished
self.start_worker(self.requirement_a, 'concurrent')
self.start_worker(self.requirement_b, 'concurrent')
self.concurrent_thread_pool.waitForDone() # wait for requirement_a and requirement_b to finish
time.sleep(1) # do stuff
print(f'[{datetime.now()}] Executing step2')
time.sleep(1) # do stuff
if __name__ == '__main__':
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment