Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Demo - Producer/Consumer Problem
"""
Implementation 1: Infinite Loop in Consumers
The problem:
the serving line is busy waiting
"""
import queue
import threading
orders = queue.Queue()
def serving_line_or_consumer():
while True: # PROBLEM: Wait for orders
new_order = orders.get()
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done() # Invoke this to indicate the "order" in the Queue is processed
def order_line_or_producer():
# Each staff in the serving line produces 200 orders
for _ in range(200):
orders.put("Order")
# Let's put 4 staff into the order line
order_line = [threading.Thread(target=order_line_or_producer) for _ in range(4)]
# Let's assign 6 staff into the serving line
serving_line = [threading.Thread(target=serving_line_or_consumer) for _ in range(6)]
# Put all staff to work
[t.start() for t in order_line]
[t.start() for t in serving_line]
# "join" the order, block until all orders are cleared
orders.join()
# "join" the threads, ending all threads
[t.join() for t in order_line]
[t.join() for t in serving_line]
"""
Implementation 2: Sentinel Value to Stop Busy Waiting
The problem:
While using a sentinel value "None", prevents busy waiting,
it terminates all consumers when there isn't any order.
We want the serving line (consumers) to be ready
whenever there is an order, not sign off from work once there is no order.
"""
import queue
import threading
orders = queue.Queue()
def serving_line_or_consumer():
has_order = True
while has_order:
new_order = orders.get()
if new_order is None: # Check for Sentinel Value
has_order = False
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done() # Invoke this to indicate the "order" in the Queue is processed
def order_line_or_producer():
# Each staff in the serving line produces 200 orders
for _ in range(200):
orders.put("Order")
# Let's put 4 staff into the order line
order_line = [threading.Thread(target=order_line_or_producer) for _ in range(4)]
# Let's assign 6 staff into the serving line
serving_line = [threading.Thread(target=serving_line_or_consumer) for _ in range(6)]
# Put all staff to work
[t.start() for t in order_line]
[t.start() for t in serving_line]
# Inform serving line no more orders
[orders.put(None) for _ in range(len(serving_line))]
# "join" the order, block until all orders are cleared
orders.join()
# "join" the threads, ending all threads
[t.join() for t in order_line]
[t.join() for t in serving_line]
"""
Implementation 3: Use Semaphores
Using Semaphores is a better solution, the thread is put to idle while waiting to acquire the Semaphore.
Thus, no busy waiting, nor termination of the consumers.
"""
import queue
import threading
orders = queue.Queue()
has_order = threading.Semaphore(value=0)
def serving_line_or_consumer():
while has_order.acquire(): # Acquire a Semaphore, or sleep until the counter of semaphore is larger than zero
new_order = orders.get()
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done()
def order_line_or_producer():
# Each staff in the serving line produces 200 orders
for _ in range(200):
orders.put("Order")
has_order.release() # Release the Semaphore, increment the internal counter by 1
# Let's put 4 staff into the order line
order_line = [threading.Thread(target=order_line_or_producer) for _ in range(4)]
# Let's assign 6 staff into the serving line
serving_line = [threading.Thread(target=serving_line_or_consumer) for _ in range(6)]
# Put all staff to work
[t.start() for t in order_line]
[t.start() for t in serving_line]
# "join" the order, block until all orders are cleared
orders.join()
# "join" the threads, ending all threads
[t.join() for t in order_line]
[t.join() for t in serving_line]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.