Last active
June 3, 2023 20:17
-
-
Save adhamenaya/9ee5c27d99995be730885953c5986a7e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import random | |
import multiprocessing | |
# consumer class simulate the continuous data collection process | |
class Producer: | |
def __init__(self, queue): | |
super().__init__() | |
self.queue = queue | |
def run(self): | |
print("Producer process started...") | |
while True: | |
# simulate the time needed to collect data | |
input_time = random.randrange(1, 4) | |
time.sleep(input_time) | |
# simulate date collection | |
input_data = 5 | |
self.queue.put_nowait(input_data) | |
print(f" {input_data} is collected in time {input_time} secs") | |
# producer class simulates the work of data processing algorithm | |
class Consumer: | |
def __init__(self, queue): | |
super().__init__() | |
self.queue = queue | |
def run(self): | |
process_data = 0 | |
print("Consumer process started...") | |
while True: | |
# issue a blocking get, to avoid accessing an empty queue | |
data = self.queue.get() | |
# simulate time needed to process data | |
procss_time = random.randrange(6, 9) | |
time.sleep(procss_time) | |
# simulate data processing algorithm | |
process_data += data | |
print(f" input: {data}, new result: {process_data} is processed in {procss_time} secs") | |
if __name__ == "__main__": | |
# create a shared queue | |
queue = multiprocessing.Queue() | |
producer = Producer(queue) | |
consumer = Consumer(queue) | |
# start instances on parallel processes | |
producer_process = multiprocessing.Process(target=producer.run) | |
consumer_process = multiprocessing.Process(target=consumer.run) | |
producer_process.start() | |
consumer_process.start() | |
# Prevent the main thread from terminating: | |
producer_process.join() | |
consumer_process.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment