Skip to content

Instantly share code, notes, and snippets.

@adhamenaya
Last active June 3, 2023 20:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adhamenaya/9ee5c27d99995be730885953c5986a7e to your computer and use it in GitHub Desktop.
Save adhamenaya/9ee5c27d99995be730885953c5986a7e to your computer and use it in GitHub Desktop.
import time
import random
import multiprocessing
# consumer class simulate the continuous data collection process
class Producer:
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
print("Producer process started...")
while True:
# simulate the time needed to collect data
input_time = random.randrange(1, 4)
time.sleep(input_time)
# simulate date collection
input_data = 5
self.queue.put_nowait(input_data)
print(f" {input_data} is collected in time {input_time} secs")
# producer class simulates the work of data processing algorithm
class Consumer:
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
process_data = 0
print("Consumer process started...")
while True:
# issue a blocking get, to avoid accessing an empty queue
data = self.queue.get()
# simulate time needed to process data
procss_time = random.randrange(6, 9)
time.sleep(procss_time)
# simulate data processing algorithm
process_data += data
print(f" input: {data}, new result: {process_data} is processed in {procss_time} secs")
if __name__ == "__main__":
# create a shared queue
queue = multiprocessing.Queue()
producer = Producer(queue)
consumer = Consumer(queue)
# start instances on parallel processes
producer_process = multiprocessing.Process(target=producer.run)
consumer_process = multiprocessing.Process(target=consumer.run)
producer_process.start()
consumer_process.start()
# Prevent the main thread from terminating:
producer_process.join()
consumer_process.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment