Last active
May 10, 2022 16:34
-
-
Save sborquez/93456ad5ecbf155f2364cea2032f169e to your computer and use it in GitHub Desktop.
Base code for a multi producer (worker) single consumer (writer) pattern.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Process, Manager, Pool | |
from multiprocessing.managers import AutoProxy | |
from typing import Any, List, Optional, Tuple | |
END_OF_QUEUE = None | |
class WorkersPool: | |
def __init__(self, processes: Optional[int] = None) -> None: | |
"""Manage and run workers a paralellizable task. | |
Args: | |
processes (Optional[int], optional): Number of concurrent workers. | |
Defaults to None. | |
""" | |
self.manager = Manager() | |
self.output_queue = self.manager.Queue() | |
self.processes = processes | |
self.pool = Pool(self.processes) | |
def run_async(self, run_args: Any) -> None: | |
"""Run asynchronous a task given a set of arguments. | |
Args: | |
run_args (Any): Custom arguments to setup workers arguments. | |
""" | |
tasks_args = self.prepare_args(run_args) | |
total = len(tasks_args) | |
self.results = [] | |
for i in range(total): | |
task_args = tasks_args.pop(0) | |
worker_arg = (i, self.output_queue, task_args) | |
self.results.append( | |
self.pool.apply_async(self.task, (worker_arg,)) | |
) | |
del task_args | |
def notify_finish(self) -> List[Any]: | |
"""Check if all workers finished, then notify the writer throught the | |
queue. | |
Returns: | |
List[Any]: List of tasks' returned outputs. | |
""" | |
results = [res.get() for res in self.results] | |
self.output_queue.put(END_OF_QUEUE) | |
return results | |
@staticmethod | |
def prepare_args(run_args: Any) -> List[Any]: | |
"""Setup a list of workers arguments into a flatten list of tuples. | |
Args: | |
run_args (Any): Custom arguments to setup workers arguments. | |
Returns: | |
List[Any]: Each list item contains the arguments for a single | |
worker. | |
""" | |
# Example: | |
# return [(i,) for i in range(50)] | |
raise NotImplementedError | |
@staticmethod | |
def task(worker_args: Tuple[int, AutoProxy, Any]) -> Any: | |
"""The concurrent task. The main results can be outputted using the | |
`output_queue.put`. Simpler or secundary outputs can be returned. | |
Args: | |
worker_args (Tuple): Worker id, Queue and Task arguments. | |
Returns: | |
Any: Return a secondary result. | |
""" | |
worker_id: int = worker_args[0] | |
output_queue: AutoProxy = worker_args[1] | |
task_args: Any = worker_args[2] | |
# Example: | |
# import random | |
# import time | |
# sleep = random.randint(1, 10) | |
# time.sleep(sleep) | |
# result = {'worker_id': worker_id, 'args': args, 'sleep': sleep} | |
# output_queue.put(result) | |
# return sleep | |
raise NotImplementedError | |
class Writer(Process): | |
def __init__(self, output_filepath: str, queue: AutoProxy) -> None: | |
"""Consolidate the results into a single output file. | |
Args: | |
output_filepath (str): File with the run results. | |
queue (AutoProxy): A WorkersPool queue with the workers results. | |
""" | |
super().__init__() | |
self.output_filepath = output_filepath | |
self.queue = queue | |
self.writed_results = 0 | |
def before_write(self) -> None: | |
"""It runs before writing any result.""" | |
# Example: | |
# print('Waiting for write') | |
# with open(self.output_filepath, 'w') as f: | |
# f.write(f'Starting\n') | |
raise NotImplementedError | |
def after_write(self) -> None: | |
"""It runs after writing all the results.""" | |
# Example: | |
# print('Queue ended') | |
# with open(self.output_filepath, 'a') as f: | |
# f.write(f'Ending\n') | |
raise NotImplementedError | |
def write_result(self, result: Any) -> None: | |
"""Write a single result instance. | |
Args: | |
result (Any): A worker result. | |
""" | |
# Example: | |
# with open(self.output_filepath, 'a') as f: | |
# f.write(f'{result}\n') | |
raise NotImplementedError | |
def run(self) -> None: | |
"""The writing loop. It waits for new results, and then it writes them | |
into the output file. If the queue pops an `END_OF_QUEUE,` it stops | |
writing. | |
""" | |
self.before_write() | |
while True: | |
result = self.queue.get() | |
if result is END_OF_QUEUE: | |
break | |
self.writed_results += 1 | |
print(f'Writing result {self.writed_results}') | |
self.write_result(result) | |
del result | |
self.after_write() | |
def main(output_filepath: str, processes: Optional[int] = None) -> List[Any]: | |
workers_pool = WorkersPool(processes) | |
writer = Writer(output_filepath, queue=workers_pool.output_queue) | |
writer.start() | |
print('Pooling tasks') | |
# Write your custom args | |
run_args = None | |
workers_pool.run_async(run_args) | |
results = workers_pool.notify_finish() | |
print('Waiting writer') | |
writer.join() | |
print('Done!') | |
return results | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser(description='Run Multi-Producer/Single Consumer job.') | |
parser.add_argument('--processes', type=int, default=None, | |
help='Number of processes for producers.') | |
parser.add_argument('--output_filepath', type=str, | |
help='Consumer file to write the producers` results.') | |
args = parser.parse_args() | |
results = main(args.output_filepath, args.processes) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment