Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Producer Consumer
import concurrent.futures
import logging
import random
import threading
SENTINEL = object()
def producer(_pipeline):
"""Pretend we're getting a message from the network."""
for index in range(10):
message = random.randint(1, 101)
logging.info("Producer got message: %s", message)
_pipeline.produce_message(message, "Producer")
# Send a sentinel message to tell consumer we're done
_pipeline.produce_message(SENTINEL, "Producer")
def consumer(_pipeline):
"""Pretend we're saving a number in the database."""
message = 0
while message is not SENTINEL:
message = _pipeline.consume_message("Consumer")
if message is not SENTINEL:
logging.info("Consumer storing message: %s", message)
class Pipeline:
"""
Class to allow a single element pipeline between producer and consumer.
"""
def __init__(self):
self.message = 0
self.producer_lock = threading.Lock()
self.consumer_lock = threading.Lock()
self.consumer_lock.acquire() # initial state to start as producer produces many messages
# consumer should not consume until there is a message
def consume_message(self, name): # for consumer
logging.debug("%s:about to acquire getlock", name)
self.consumer_lock.acquire()
logging.debug("%s:have getlock", name)
message = self.message
logging.debug("%s:about to release setlock", name)
self.producer_lock.release()
logging.debug("%s:setlock released", name)
return message
def produce_message(self, message, name): # for producer
logging.debug("%s:about to acquire setlock", name)
self.producer_lock.acquire()
logging.debug("%s:have setlock", name)
self.message = message
logging.debug("%s:about to release getlock", name)
self.consumer_lock.release()
logging.debug("%s:getlock released", name)
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
# logging.getLogger().setLevel(logging.DEBUG)
pipeline = Pipeline()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline)
executor.submit(consumer, pipeline)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.