Skip to content

Instantly share code, notes, and snippets.

@radium226
Created September 6, 2016 14:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save radium226/dd7dbdf5bd254a0eb40ecd57f7110323 to your computer and use it in GitHub Desktop.
Save radium226/dd7dbdf5bd254a0eb40ecd57f7110323 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import time
import random
import queue
import threading
from concurrent import futures
SHORT_DURATION_RANGE = (1, 2)
LONG_DURATION_RANGE = (2, 4)
def sleep_randomly(duration_range=SHORT_DURATION_RANGE):
duration = random.randint(*duration_range)
time.sleep(duration)
def step_1(item):
print(" ==> step_1(%s)" % item)
sleep_randomly(SHORT_DURATION_RANGE)
print(" <== step_1(%s)" % item)
return item.upper()
def step_2(item):
print(" --> step_2(%s)" % item)
sleep_randomly(LONG_DURATION_RANGE)
print(" <-- step_2(%s)" % item)
return item[::-1]
def step_3(item):
print(" ~~> step_3(%s)" % item)
sleep_randomly(SHORT_DURATION_RANGE)
print(" <~~ step_3(%s)" % item)
return item * 2
def worker(input_queue, output_queue, work):
with futures.ThreadPoolExecutor(max_workers=3) as executor:
while True:
input_item = input_queue.get()
if input_item is None:
break
future = executor.submit(work, input_item)
def done_callback(future):
output_queue.put(future.result())
future.add_done_callback(done_callback)
input_queue.task_done()
def iter_queue(queue):
while True:
item = queue.get()
if item is None:
break
yield item
queue.task_done()
if __name__ == '__main__':
queue_0 = queue.Queue()
queue_1 = queue.Queue()
thread_1 = threading.Thread(target=worker, args=(queue_0, queue_1, step_1))
queue_2 = queue.Queue()
thread_2 = threading.Thread(target=worker, args=(queue_1, queue_2, step_2))
queue_3 = queue.Queue()
thread_3 = threading.Thread(target=worker, args=(queue_2, queue_3, step_3))
print("Starting threads... ")
threads = [thread_1, thread_2, thread_3]
for thread in threads:
thread.start()
print("Putting items... ")
for item in ['Bonjour', 'Les', 'Amis', 'Comment', 'Ça', 'Va']:
queue_0.put(item)
print("Waiting for queues... ")
queues = [queue_0, queue_1, queue_2]
for queue in queues:
queue.join()
for item in iter_queue(queue_3):
print(item)
queue_3.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment