Skip to content

Instantly share code, notes, and snippets.

@scalactic
Last active June 26, 2021 19:25
Show Gist options
  • Save scalactic/a8930f0b23194fbd31e3fe2b1147a193 to your computer and use it in GitHub Desktop.
Save scalactic/a8930f0b23194fbd31e3fe2b1147a193 to your computer and use it in GitHub Desktop.
Python multprocessing implementation over multiprocessing.Queue
from multiprocessing import Pool, Process, Manager
import time
def writer(queue):
""" Queue writer worker """
# Produce data into the queue
for i in range(10000):
t = time.time()
print({'message': i, 'time': t, 'queue_size': queue.qsize()})
queue.put({'message': i, 'time': t})
queue.put(None)
def reader(queue, reader_no):
""" Queue reader worker """
has_next = True
while has_next:
# Read the top message from the queue
message = queue.get()
# Stopping the reader itself.
if message is None:
has_next = False
else:
print({'reader-no': reader_no, 'message': message, 'queue-size': queue.qsize()})
# Each reader will stop other reader also.
queue.put(None)
return True
if __name__ == '__main__':
# Create manager
manager = Manager()
# Create multiprocessing queue with maxsize.
_queue = manager.Queue(maxsize=20)
# Writer process
Process(name="writer-process", target=writer, args=(_queue,)).start()
num_readers = 10
# Create multiprocessing pool & parallel readers and start all them
with Pool(processes=num_readers, initializer=None, maxtasksperchild=1) as pool: # Create a group of parallel readers
# The number of simultaneously running readers is constrained to the pool size
readers = [pool.apply_async(func=reader, args=(_queue, reader_no)) for reader_no in range(num_readers)]
print([r.get() for r in readers])
manager.shutdown()
@scalactic
Copy link
Author

An example output:
resim

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment