Created
June 26, 2021 19:33
-
-
Save scalactic/ac8121c66354b257e44190945232faa6 to your computer and use it in GitHub Desktop.
Python multiprocessing multiple queue implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Pool, Process, Manager | |
import time | |
import logging | |
import os | |
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] [%(asctime)s] [%(process)s] %(message)s', datefmt='%d/%m/%Y %I:%M:%S %p') | |
def writer(queue): | |
""" Queue writer worker """ | |
# Produce data into the queue | |
for i in range(10000): | |
t = time.time() | |
record = {'message': i, 'time': t} | |
logging.info(record) | |
queue.put(record) | |
queue.put(None) | |
def process(input_queue, output_queue): | |
""" Queue reader worker """ | |
has_next = True | |
while has_next: | |
# Read the top message from input_queue | |
message = input_queue.get() | |
# Stopping the process itself. | |
if message is None: | |
has_next = False | |
else: | |
output_queue.put({'process pid': os.getpid(), 'message': message, 'queue-size': input_queue.qsize()}) | |
# Each process will stop other reader also. | |
input_queue.put(None) | |
# Each process tells stopped itself to . | |
output_queue.put(None) | |
return True | |
def reader(output_queue, process_count): | |
running_process = process_count | |
while running_process: | |
# Read the top message from the queue | |
message = output_queue.get() | |
# Stopping the reader itself. | |
if message is None: | |
running_process -= 1 | |
else: | |
logging.info({'reader': 1, 'message': message, 'queue-size': output_queue.qsize()}) | |
return True | |
if __name__ == '__main__': | |
# Create manager | |
manager = Manager() | |
# Create multiprocessing queue with maxsize. | |
input_queue = manager.Queue(maxsize=20) | |
output_queue = manager.Queue(maxsize=20) | |
process_count = 5 | |
# Writer process | |
writer_proc = Process(name="writer-process", target=writer, args=(input_queue,)) | |
writer_proc.start() | |
# Final process | |
reader_proc = Process(name="reader-process", target=reader, args=(output_queue, process_count)) | |
reader_proc.start() | |
# Create multiprocessing pool & parallel readers and start all them | |
with Pool(processes=process_count, maxtasksperchild=1) as pool: # Create a group of parallel processes | |
processes = [pool.apply_async(func=process, args=(input_queue, output_queue)) for _ in range(process_count)] | |
logging.info([r.get() for r in processes]) | |
reader_proc.join() | |
writer_proc.join() | |
manager.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment