Skip to content

Instantly share code, notes, and snippets.

@wil3
Created November 10, 2017 02:32
Show Gist options
  • Save wil3/0e20a898f7d29bc987a2dbe7159d3065 to your computer and use it in GitHub Desktop.
Save wil3/0e20a898f7d29bc987a2dbe7159d3065 to your computer and use it in GitHub Desktop.
Python 2 Producer/Consumer demonstration
"""
Introduce producer/consumer model with thread safe queues
This demo can be first illustrated as an M/M/K queue in which we have incoming
messages at some rate added to a single queue, each consumer thread will
continually get and process items from the queue.
This is to demonstrate tradeoffs with the queue size, and number
of workers to use during processing and how it affects throughput performance.
"""
__author__ = "William Koch"
__email__ = "wfkoch [at] bu.edu"
# Import libraries we will be using
# This library is for multi-threading
import threading
# For Python 3
#import queue
import Queue
#Lets log stuff
import logging
#Helper stuff
import time
import random
import string
import hashlib
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("main")
logger_p = logging.getLogger("Producer")
logger_c = logging.getLogger("Consumer")
RUNNING = True
# Create our base case
class SimBase(threading.Thread):
def __init__(self, q, rate):
self.running = True
self.q = q
self.rate = rate
self.count = 0
super(SimBase, self).__init__()
class SimProducer(SimBase):
def __init__(self, q, rate):
super(SimProducer, self).__init__(q, rate)
def _generate_message(self):
return str(random.randint(0, 1000))
def run(self):
while RUNNING:
message = self._generate_message()
if not self.q.full():
logger_p.info("{} Producing {}".format(threading.current_thread().name, message))
self.q.put(message)
logger_p.debug("{} Q = {}".format(threading.current_thread().name, self.q.queue))
self.count += 1
else:
logger_p.warn("{} Q Full! Dropping {}".format(threading.current_thread().name, message))
# Time between events from poisson distirubtion are exponential
wait = random.expovariate(self.rate)
logger_p.debug("{} Sleeping for = {}".format(threading.current_thread().name, wait))
time.sleep(wait)
logger_p.debug("{} Done".format(threading.current_thread().name))
class DistributedPOW(SimBase):
def __init__(self, q, rate, difficulty):
self.difficulty = difficulty
super(DistributedPOW, self).__init__(q, rate)
def _pow(self, message):
h = None
nonce = 0
while True:
message = "{}{}".format(message, nonce )
message = message.encode("utf-8")
h = hashlib.sha256(message).hexdigest()
nonce += 1
if self.valid_prefix(h):
break
return (h, nonce)
def _send(self, message, h, nonce):
# Simulate waiting for a resource whether a network interface, database, etc
# to do something with the POW
#
# Note that previously adding threads does not increase throughput because
# the threads are not split on the cores. However here we are blocked not
# consuimg any resources thus another thread calculating the POW can
# access the CPU
wait = random.expovariate(self.rate)
logger_c.debug("{} Sleeing for = {}".format(threading.current_thread().name, wait))
time.sleep(wait)
def valid_prefix(self, hash):
"""Determine if the hash is prepended by the right number of zeros"""
return hash[:self.difficulty] == "".join(['0'] * self.difficulty)
def run(self):
while RUNNING:
# If we block here we could reach a deadlock
try:
message = self.q.get(block=False)
logger_c.info("{} Consuming {}".format(threading.current_thread().name, message))
(h, nonce) = self._pow(message)
self._send(message, h, nonce)
logger_c.info("{} Created POW ({}, {}, {})".format(threading.current_thread().name, message, nonce, h))
self.count += 1
# According to the docs if empty returns true its not gaurenteed
# get will not try and get from an empty queue so just
# ignore this if it happens
except Queue.Empty:
pass
logger_c.debug("{} Done".format(threading.current_thread().name))
if __name__ == "__main__":
random.seed(3)
BUFF_SIZE = 10
SIM_TIME = 10.0
difficulty = 3
q = Queue.Queue(BUFF_SIZE)
num_consumers = 10
consumers = []
# If we have a really fast producer the queue will overflow
# so we can either do 1 or two things, increase workers
# or increase time to process
producer_rate = 1#0.1
consumer_rate = 1#3.0
# init
producer = SimProducer(q, 1.0/producer_rate)
for i in range(num_consumers):
consumers.append(DistributedPOW(q, 1.0/consumer_rate, difficulty))
# start
producer.start()
for w in consumers:
w.start()
# run
start_time = time.time()
end_time = start_time + SIM_TIME
while True:
if time.time() > end_time:
RUNNING = False
break
time.sleep(1)
# cleanup and compute tput
# What we can demonstrate here is that although threads may not increase
# paramlization in some applications it can increase throughput
# If we play around with the number of consumers and the rate we can show this
producer.join()
produce_count = producer.count
consumer_count = 0
for w in consumers:
w.join()
consumer_count += w.count
logger.info("Tput P={} C={}".format(produce_count/SIM_TIME, consumer_count/SIM_TIME))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment