Skip to content

Instantly share code, notes, and snippets.

@stigok
Last active February 8, 2018 23:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stigok/f7277e2db5de2e0945ef6991dda35bbd to your computer and use it in GitHub Desktop.
Save stigok/f7277e2db5de2e0945ef6991dda35bbd to your computer and use it in GitHub Desktop.
import queue
import threading
import sys
class ThreadedQueue:
def __init__(self, task, timeout=10):
self.threads = dict()
self.queues = dict()
self.task = task
self.timeout = timeout
def _worker(self, name):
print("Thread %s started" % name)
while True:
try:
item = self.queues[name].get(block=True, timeout=self.timeout)
except queue.Empty:
# Kill thread if it timed out while waiting for messages
break
# print("%s says: %s" % (name, item))
self.task(name, item)
self.queues[name].task_done()
print("Thread closing for %s" % name)
def _spawn_new_thread(self, name):
# Make sure it doesn't exist alreadu
t = self.threads.get(name)
if t is not None:
raise ValueError("Thread already exists")
self.threads[name] = threading.Thread(target=self._worker, args=(name))
self.queues[name] = queue.Queue()
self.threads[name].start()
def _gc_threads(self):
empty_keys = [k for k,v in self.threads.items() if not v.is_alive()]
for k in empty_keys:
del self.threads[k]
del self.queues[k]
print("Removed inactive thread and queue for '%s'" % k)
def handle(self, name, msg):
self._gc_threads()
if self.threads.get(name) is None:
self._spawn_new_thread(name)
self.queues[name].put(msg)
def test():
def task(name, msg):
print("Queue with name %s receieved: %s" % (name, msg))
tq = ThreadedQueue(task, timeout=3)
print("Write your message. First letter of each message acts as key.")
while True:
line = sys.stdin.readline()
msg = line.rstrip()
if len(msg) > 0:
key = msg[0]
tq.handle(key, msg)
if __name__ == "__main__":
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment