-
-
Save relent95/f05043544c79ac7e6c97972519396196 to your computer and use it in GitHub Desktop.
test_thread_pool.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is a correction of the test code by Teun. | |
import sys | |
from datetime import datetime | |
import time | |
from PyQt6.QtCore import QThreadPool, QRunnable, QObject, pyqtSignal, pyqtSlot | |
from PyQt6.QtWidgets import QMainWindow, QPushButton, QApplication | |
class WorkerSignals(QObject): | |
"""Defines the signals available from a running worker thread. | |
Supported signals are: | |
- started: no data | |
- finished: no data | |
""" | |
started = pyqtSignal(object) | |
finished = pyqtSignal(object) | |
class Worker(QRunnable): | |
"""Worker thread""" | |
def __init__(self, fun, *args, **kwargs): | |
"""Initialize method of the Worker thread""" | |
super(Worker, self).__init__() | |
# Store constructor arguments (re-used for processing) | |
self.fun = fun | |
self.args = args | |
self.kwargs = kwargs | |
self.signals = WorkerSignals() | |
@pyqtSlot() | |
def run(self): | |
"""Execute the function with passed args and kwargs""" | |
self.signals.started.emit(self) | |
self.fun(*self.args, **self.kwargs) | |
self.signals.finished.emit(self) | |
class MainWindow(QMainWindow): | |
def __init__(self): | |
super().__init__() | |
self.button = QPushButton('Start', self) | |
self.button.setGeometry(10, 10, 100, 30) | |
self.button.clicked.connect(self.start) | |
self.sequential_thread_pool = QThreadPool() | |
self.sequential_thread_pool.setMaxThreadCount(1) | |
self.concurrent_thread_pool = QThreadPool() | |
def start(self): | |
self.start_worker(self.step1, 'sequential') | |
self.start_worker(self.step2, 'sequential') | |
def on_worker_started(self, worker): | |
print(f'[{datetime.now()}] Started {worker.fun.__name__}') | |
def on_worker_finished(self, worker): | |
print(f'[{datetime.now()}] Finished {worker.fun.__name__}') | |
def start_worker(self, fun, thread_pool): | |
worker = Worker(fun) | |
worker.signals.started.connect(self.on_worker_started) | |
worker.signals.finished.connect(self.on_worker_finished) | |
if thread_pool == 'sequential': | |
self.sequential_thread_pool.start(worker) | |
elif thread_pool == 'concurrent': | |
self.concurrent_thread_pool.start(worker) | |
def step1(self): | |
# this is a step that needs to execute first | |
time.sleep(1) # do stuff | |
print(f'[{datetime.now()}] Executing step1') | |
time.sleep(1) # do stuff | |
def requirement_a(self): | |
# this is a step that needs to execute after step1 finished, but can happen concurrently with requirement_b | |
time.sleep(1) # do stuff | |
print(f'[{datetime.now()}] Executing requirement_a') | |
time.sleep(1) # do stuff | |
def requirement_b(self): | |
# this is a step that needs to execute after step1 finished, but can happen concurrently with requirement_a | |
time.sleep(1) # do stuff | |
print(f'[{datetime.now()}] Executing requirement_b') | |
time.sleep(1) # do stuff | |
def step2(self): | |
# this is a step that needs to execute after step1, requirement_a and requirement_b finished | |
self.start_worker(self.requirement_a, 'concurrent') | |
self.start_worker(self.requirement_b, 'concurrent') | |
self.concurrent_thread_pool.waitForDone() # wait for requirement_a and requirement_b to finish | |
time.sleep(1) # do stuff | |
print(f'[{datetime.now()}] Executing step2') | |
time.sleep(1) # do stuff | |
if __name__ == '__main__': | |
app = QApplication(sys.argv) | |
window = MainWindow() | |
window.show() | |
sys.exit(app.exec()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment