Last active
November 23, 2019 11:38
-
-
Save melvinkcx/d097ae42aef1ea43e6e8e3d0af2fda95 to your computer and use it in GitHub Desktop.
Demo - Producer/Consumer Problem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Implementation 1: Infinite Loop in Consumers | |
The problem: | |
the serving line is busy waiting | |
""" | |
import queue | |
import threading | |
orders = queue.Queue() | |
def serving_line_or_consumer(): | |
while True: # PROBLEM: Wait for orders | |
new_order = orders.get() | |
# prepare meals from `new_order`, assuming GIL is released while preparing meals | |
orders.task_done() # Invoke this to indicate the "order" in the Queue is processed | |
def order_line_or_producer(): | |
# Each staff in the serving line produces 200 orders | |
for _ in range(200): | |
orders.put("Order") | |
# Let's put 4 staff into the order line | |
order_line = [threading.Thread(target=order_line_or_producer) for _ in range(4)] | |
# Let's assign 6 staff into the serving line | |
serving_line = [threading.Thread(target=serving_line_or_consumer) for _ in range(6)] | |
# Put all staff to work | |
[t.start() for t in order_line] | |
[t.start() for t in serving_line] | |
# "join" the order, block until all orders are cleared | |
orders.join() | |
# "join" the threads, ending all threads | |
[t.join() for t in order_line] | |
[t.join() for t in serving_line] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Implementation 2: Sentinel Value to Stop Busy Waiting | |
The problem: | |
While using a sentinel value "None", prevents busy waiting, | |
it terminates all consumers when there isn't any order. | |
We want the serving line (consumers) to be ready | |
whenever there is an order, not sign off from work once there is no order. | |
""" | |
import queue | |
import threading | |
orders = queue.Queue() | |
def serving_line_or_consumer(): | |
has_order = True | |
while has_order: | |
new_order = orders.get() | |
if new_order is None: # Check for Sentinel Value | |
has_order = False | |
# prepare meals from `new_order`, assuming GIL is released while preparing meals | |
orders.task_done() # Invoke this to indicate the "order" in the Queue is processed | |
def order_line_or_producer(): | |
# Each staff in the serving line produces 200 orders | |
for _ in range(200): | |
orders.put("Order") | |
# Let's put 4 staff into the order line | |
order_line = [threading.Thread(target=order_line_or_producer) for _ in range(4)] | |
# Let's assign 6 staff into the serving line | |
serving_line = [threading.Thread(target=serving_line_or_consumer) for _ in range(6)] | |
# Put all staff to work | |
[t.start() for t in order_line] | |
[t.start() for t in serving_line] | |
# Inform serving line no more orders | |
[orders.put(None) for _ in range(len(serving_line))] | |
# "join" the order, block until all orders are cleared | |
orders.join() | |
# "join" the threads, ending all threads | |
[t.join() for t in order_line] | |
[t.join() for t in serving_line] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Implementation 3: Use Semaphores | |
Using Semaphores is a better solution, the thread is put to idle while waiting to acquire the Semaphore. | |
Thus, no busy waiting, nor termination of the consumers. | |
""" | |
import queue | |
import threading | |
orders = queue.Queue() | |
has_order = threading.Semaphore(value=0) | |
def serving_line_or_consumer(): | |
while has_order.acquire(): # Acquire a Semaphore, or sleep until the counter of semaphore is larger than zero | |
new_order = orders.get() | |
# prepare meals from `new_order`, assuming GIL is released while preparing meals | |
orders.task_done() | |
def order_line_or_producer(): | |
# Each staff in the serving line produces 200 orders | |
for _ in range(200): | |
orders.put("Order") | |
has_order.release() # Release the Semaphore, increment the internal counter by 1 | |
# Let's put 4 staff into the order line | |
order_line = [threading.Thread(target=order_line_or_producer) for _ in range(4)] | |
# Let's assign 6 staff into the serving line | |
serving_line = [threading.Thread(target=serving_line_or_consumer) for _ in range(6)] | |
# Put all staff to work | |
[t.start() for t in order_line] | |
[t.start() for t in serving_line] | |
# "join" the order, block until all orders are cleared | |
orders.join() | |
# "join" the threads, ending all threads | |
[t.join() for t in order_line] | |
[t.join() for t in serving_line] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment