Skip to content

Instantly share code, notes, and snippets.

@ygpark2
Last active May 26, 2020 05:18
Show Gist options
  • Save ygpark2/c0d0f53f921fb1da9f22b642bcfeb4cd to your computer and use it in GitHub Desktop.
Save ygpark2/c0d0f53f921fb1da9f22b642bcfeb4cd to your computer and use it in GitHub Desktop.
How to use Queue in threading
from queue import Queue, PriorityQueue
from threading import Thread
class DisplayManager(Thread):
def __init__(self, observer):
Thread.__init__(self)
self.queue = Queue()
observer.subscribe(self.queue.put)
def run(self):
while True:
change = self.queue.get()
print("run change => ", change)
print("queue size => ", self.queue.qsize())
class DisplayObservable:
listeners = []
def __init__(self):
pass
def __new__(self):
if not hasattr(self, 'instance'):
self.instance = super(DisplayObservable, self).__new__(self)
return self.instance
def subscribe(self, listener):
self.listeners.append(listener)
def onNotify(self, change):
for listener in self.listeners:
listener(change)
from display_observer import DisplayObservable
from display_manager import DisplayManager
if __name__ == '__main__':
do = DisplayObservable()
dm = DisplayManager(do)
dm.start()
print("Object created ", do)
print("listener len ", len(do.listeners))
do.onNotify("test data 1")
do1 = DisplayObservable()
print("Object created ", do1)
print("listener len ", len(do1.listeners))
do1.onNotify("test data 2")
do2 = DisplayObservable()
print("Object created ", do2)
print("listener len ", len(do2.listeners))
do2.onNotify("test data 3")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment