Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save scalactic/ac8121c66354b257e44190945232faa6 to your computer and use it in GitHub Desktop.
Save scalactic/ac8121c66354b257e44190945232faa6 to your computer and use it in GitHub Desktop.
Python multiprocessing multiple queue implementation
from multiprocessing import Pool, Process, Manager
import time
import logging
import os
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] [%(asctime)s] [%(process)s] %(message)s', datefmt='%d/%m/%Y %I:%M:%S %p')
def writer(queue):
""" Queue writer worker """
# Produce data into the queue
for i in range(10000):
t = time.time()
record = {'message': i, 'time': t}
logging.info(record)
queue.put(record)
queue.put(None)
def process(input_queue, output_queue):
""" Queue reader worker """
has_next = True
while has_next:
# Read the top message from input_queue
message = input_queue.get()
# Stopping the process itself.
if message is None:
has_next = False
else:
output_queue.put({'process pid': os.getpid(), 'message': message, 'queue-size': input_queue.qsize()})
# Each process will stop other reader also.
input_queue.put(None)
# Each process tells stopped itself to .
output_queue.put(None)
return True
def reader(output_queue, process_count):
running_process = process_count
while running_process:
# Read the top message from the queue
message = output_queue.get()
# Stopping the reader itself.
if message is None:
running_process -= 1
else:
logging.info({'reader': 1, 'message': message, 'queue-size': output_queue.qsize()})
return True
if __name__ == '__main__':
# Create manager
manager = Manager()
# Create multiprocessing queue with maxsize.
input_queue = manager.Queue(maxsize=20)
output_queue = manager.Queue(maxsize=20)
process_count = 5
# Writer process
writer_proc = Process(name="writer-process", target=writer, args=(input_queue,))
writer_proc.start()
# Final process
reader_proc = Process(name="reader-process", target=reader, args=(output_queue, process_count))
reader_proc.start()
# Create multiprocessing pool & parallel readers and start all them
with Pool(processes=process_count, maxtasksperchild=1) as pool: # Create a group of parallel processes
processes = [pool.apply_async(func=process, args=(input_queue, output_queue)) for _ in range(process_count)]
logging.info([r.get() for r in processes])
reader_proc.join()
writer_proc.join()
manager.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment