Created
May 16, 2019 09:09
-
-
Save lokesh1729/fe57f13c25ed38461ab335126072a4fd to your computer and use it in GitHub Desktop.
Producer Consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import concurrent.futures | |
import logging | |
import random | |
import threading | |
SENTINEL = object() | |
def producer(_pipeline): | |
"""Pretend we're getting a message from the network.""" | |
for index in range(10): | |
message = random.randint(1, 101) | |
logging.info("Producer got message: %s", message) | |
_pipeline.produce_message(message, "Producer") | |
# Send a sentinel message to tell consumer we're done | |
_pipeline.produce_message(SENTINEL, "Producer") | |
def consumer(_pipeline): | |
"""Pretend we're saving a number in the database.""" | |
message = 0 | |
while message is not SENTINEL: | |
message = _pipeline.consume_message("Consumer") | |
if message is not SENTINEL: | |
logging.info("Consumer storing message: %s", message) | |
class Pipeline: | |
""" | |
Class to allow a single element pipeline between producer and consumer. | |
""" | |
def __init__(self): | |
self.message = 0 | |
self.producer_lock = threading.Lock() | |
self.consumer_lock = threading.Lock() | |
self.consumer_lock.acquire() # initial state to start as producer produces many messages | |
# consumer should not consume until there is a message | |
def consume_message(self, name): # for consumer | |
logging.debug("%s:about to acquire getlock", name) | |
self.consumer_lock.acquire() | |
logging.debug("%s:have getlock", name) | |
message = self.message | |
logging.debug("%s:about to release setlock", name) | |
self.producer_lock.release() | |
logging.debug("%s:setlock released", name) | |
return message | |
def produce_message(self, message, name): # for producer | |
logging.debug("%s:about to acquire setlock", name) | |
self.producer_lock.acquire() | |
logging.debug("%s:have setlock", name) | |
self.message = message | |
logging.debug("%s:about to release getlock", name) | |
self.consumer_lock.release() | |
logging.debug("%s:getlock released", name) | |
if __name__ == "__main__": | |
format = "%(asctime)s: %(message)s" | |
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") | |
# logging.getLogger().setLevel(logging.DEBUG) | |
pipeline = Pipeline() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: | |
executor.submit(producer, pipeline) | |
executor.submit(consumer, pipeline) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment