Skip to content

Instantly share code, notes, and snippets.

@jerome-diver
Last active June 7, 2019 18:32
Show Gist options
  • Save jerome-diver/6e11e6dbd8c63ea05be321d220b35fe6 to your computer and use it in GitHub Desktop.
Save jerome-diver/6e11e6dbd8c63ea05be321d220b35fe6 to your computer and use it in GitHub Desktop.
I'm trying to manage a list of threads and from own signal emit self at end of run job, slot connected should remove thread (but it doesn't). Why ?
from PyQt5.QtCore import QObject, QThread, QSemaphore, QReadWriteLock, pyqtSignal, pyqtSlot
import random, time
class Share():
data = dict()
def __init__(self, key, value):
Share.data[key] = value
@staticmethod
def get(key):
return Share.data[key]
@staticmethod
def set(key, value):
if isinstance(value, int):
print("new value:", value)
Share.data[key] = value
else:
raise Exception("Need to be an int")
@staticmethod
def multiply(key, value):
Share.data[key] *= value
class Worker(QThread):
sma = QSemaphore(QThread.idealThreadCount())
mutex = QReadWriteLock()
end = pyqtSignal(Worker)
count = 0
def __init__(self, state, data, key):
super().__init__()
Worker.count += 1
self.number = Worker.count
self.state = state
self.data = data
self.key = key
self.finished.connect(self.__del__)
def __del__(self):
self.quit()
self.deleteLater()
def run(self):
self.sma.acquire()
factor = random.randint(1, 10)
operation = factor if self.state else 1 / factor
wait = random.randint(1,10)
self.mutex.lockForWrite()
print("thread", self.number, "running | operation =>", operation, \
"| threads aviable =>", self.sma.available())
time.sleep(wait)
self.data.multiply(self.key, operation)
print("from", self.number, "data = ", self.data.get(self.key))
self.mutex.unlock()
self.end.emit(self)
class Test(QObject):
def __init__(self, origin):
super().__init__()
self.w = list()
for i in range(2,15):
state = random.randint(0,1)
new_worker = Worker(state, origin, "a")
new_worker.end.connect(self.on_end)
self.w.append(new_worker)
print("there is", len(self.w), "workers running")
self.w[i-2].start()
while len(w) != 0:
time.sleep(1)
print("finally, origin is", origin)
@pyqtSlot(Worker)
def on_end(self, worker):
print("bye bye", worker.number)
del self.w[worker]
origin = Share("a", 1)
ok = Test(origin)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment