Skip to content

Instantly share code, notes, and snippets.

@dchrostowski
Last active April 23, 2019 02:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dchrostowski/aea07510fe5e76104eda25d740788a0f to your computer and use it in GitHub Desktop.
Save dchrostowski/aea07510fe5e76104eda25d740788a0f to your computer and use it in GitHub Desktop.
Generic synchronous task queue with concurrency in Python3
from threading import Thread
import queue
import time
import random
import string
class Task(object):
def __init__(self,*args,**kwargs):
self.fn = kwargs.pop('fn')
self.args = args
self.kwargs = kwargs
def execute(self):
self.fn(*self.args,**self.kwargs)
class TaskQueue(object):
def __init__(self,default_task_function=None):
self.queue = queue.Queue()
self.thread = Thread(target=self.worker)
self.thread.daemon = True
self.default_task_fn = default_task_function or self._task_fn
self.thread.start()
def enqueue(self,*args,**kwargs):
kwargs.setdefault('fn',self.default_task_fn)
self.queue.put(Task(*args,**kwargs))
@staticmethod
def task_fn(*args,**kwargs):
raise Exception("No task function defined! Either override this method or pass a function to the TaskQueue.")
def worker(self):
while True:
task = self.queue.get()
if task is None:
self.queue.task_done()
break
task.execute()
self.queue.task_done()
def finish(self):
self.queue.put(None)
self.queue.join()
self.thread.join()
def main():
def generate_task_id():
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
def task_fn(task_id,wait_time):
print("task thread: task id %s will sleep %s seconds" % (task_id,wait_time))
time.sleep(wait_time)
print("task thread: task id %s has finished." % task_id)
tq = TaskQueue(default_task_function=task_fn)
for i in range(10):
task_id = generate_task_id()
wait_time = random.randint(1,5)
print("main thread: enqueuing task ID %s." % task_id)
tq.enqueue(task_id,wait_time)
print("main thread: do stuff.")
for i in range(5):
main_wait = random.randint(1,3)
print("main thread: sleeping %s seconds" % main_wait)
time.sleep(main_wait)
print("main thread: awaiting task queue to finish...")
tq.finish()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment