Last active
April 23, 2019 02:48
-
-
Save dchrostowski/aea07510fe5e76104eda25d740788a0f to your computer and use it in GitHub Desktop.
Generic synchronous task queue with concurrency in Python3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from threading import Thread | |
import queue | |
import time | |
import random | |
import string | |
class Task(object): | |
def __init__(self,*args,**kwargs): | |
self.fn = kwargs.pop('fn') | |
self.args = args | |
self.kwargs = kwargs | |
def execute(self): | |
self.fn(*self.args,**self.kwargs) | |
class TaskQueue(object): | |
def __init__(self,default_task_function=None): | |
self.queue = queue.Queue() | |
self.thread = Thread(target=self.worker) | |
self.thread.daemon = True | |
self.default_task_fn = default_task_function or self._task_fn | |
self.thread.start() | |
def enqueue(self,*args,**kwargs): | |
kwargs.setdefault('fn',self.default_task_fn) | |
self.queue.put(Task(*args,**kwargs)) | |
@staticmethod | |
def task_fn(*args,**kwargs): | |
raise Exception("No task function defined! Either override this method or pass a function to the TaskQueue.") | |
def worker(self): | |
while True: | |
task = self.queue.get() | |
if task is None: | |
self.queue.task_done() | |
break | |
task.execute() | |
self.queue.task_done() | |
def finish(self): | |
self.queue.put(None) | |
self.queue.join() | |
self.thread.join() | |
def main(): | |
def generate_task_id(): | |
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=8)) | |
def task_fn(task_id,wait_time): | |
print("task thread: task id %s will sleep %s seconds" % (task_id,wait_time)) | |
time.sleep(wait_time) | |
print("task thread: task id %s has finished." % task_id) | |
tq = TaskQueue(default_task_function=task_fn) | |
for i in range(10): | |
task_id = generate_task_id() | |
wait_time = random.randint(1,5) | |
print("main thread: enqueuing task ID %s." % task_id) | |
tq.enqueue(task_id,wait_time) | |
print("main thread: do stuff.") | |
for i in range(5): | |
main_wait = random.randint(1,3) | |
print("main thread: sleeping %s seconds" % main_wait) | |
time.sleep(main_wait) | |
print("main thread: awaiting task queue to finish...") | |
tq.finish() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment