Last active
June 26, 2021 19:25
-
-
Save scalactic/a8930f0b23194fbd31e3fe2b1147a193 to your computer and use it in GitHub Desktop.
Python multprocessing implementation over multiprocessing.Queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Pool, Process, Manager | |
import time | |
def writer(queue): | |
""" Queue writer worker """ | |
# Produce data into the queue | |
for i in range(10000): | |
t = time.time() | |
print({'message': i, 'time': t, 'queue_size': queue.qsize()}) | |
queue.put({'message': i, 'time': t}) | |
queue.put(None) | |
def reader(queue, reader_no): | |
""" Queue reader worker """ | |
has_next = True | |
while has_next: | |
# Read the top message from the queue | |
message = queue.get() | |
# Stopping the reader itself. | |
if message is None: | |
has_next = False | |
else: | |
print({'reader-no': reader_no, 'message': message, 'queue-size': queue.qsize()}) | |
# Each reader will stop other reader also. | |
queue.put(None) | |
return True | |
if __name__ == '__main__': | |
# Create manager | |
manager = Manager() | |
# Create multiprocessing queue with maxsize. | |
_queue = manager.Queue(maxsize=20) | |
# Writer process | |
Process(name="writer-process", target=writer, args=(_queue,)).start() | |
num_readers = 10 | |
# Create multiprocessing pool & parallel readers and start all them | |
with Pool(processes=num_readers, initializer=None, maxtasksperchild=1) as pool: # Create a group of parallel readers | |
# The number of simultaneously running readers is constrained to the pool size | |
readers = [pool.apply_async(func=reader, args=(_queue, reader_no)) for reader_no in range(num_readers)] | |
print([r.get() for r in readers]) | |
manager.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
An example output:
![resim](https://user-images.githubusercontent.com/19242970/123220937-14fb4200-d4d7-11eb-82b4-bdd165398efa.png)