Skip to content

Instantly share code, notes, and snippets.

@sborquez
Last active May 10, 2022 16:34
Show Gist options
  • Save sborquez/93456ad5ecbf155f2364cea2032f169e to your computer and use it in GitHub Desktop.
Save sborquez/93456ad5ecbf155f2364cea2032f169e to your computer and use it in GitHub Desktop.
Base code for a multi producer (worker) single consumer (writer) pattern.
from multiprocessing import Process, Manager, Pool
from multiprocessing.managers import AutoProxy
from typing import Any, List, Optional, Tuple
END_OF_QUEUE = None
class WorkersPool:
def __init__(self, processes: Optional[int] = None) -> None:
"""Manage and run workers a paralellizable task.
Args:
processes (Optional[int], optional): Number of concurrent workers.
Defaults to None.
"""
self.manager = Manager()
self.output_queue = self.manager.Queue()
self.processes = processes
self.pool = Pool(self.processes)
def run_async(self, run_args: Any) -> None:
"""Run asynchronous a task given a set of arguments.
Args:
run_args (Any): Custom arguments to setup workers arguments.
"""
tasks_args = self.prepare_args(run_args)
total = len(tasks_args)
self.results = []
for i in range(total):
task_args = tasks_args.pop(0)
worker_arg = (i, self.output_queue, task_args)
self.results.append(
self.pool.apply_async(self.task, (worker_arg,))
)
del task_args
def notify_finish(self) -> List[Any]:
"""Check if all workers finished, then notify the writer throught the
queue.
Returns:
List[Any]: List of tasks' returned outputs.
"""
results = [res.get() for res in self.results]
self.output_queue.put(END_OF_QUEUE)
return results
@staticmethod
def prepare_args(run_args: Any) -> List[Any]:
"""Setup a list of workers arguments into a flatten list of tuples.
Args:
run_args (Any): Custom arguments to setup workers arguments.
Returns:
List[Any]: Each list item contains the arguments for a single
worker.
"""
# Example:
# return [(i,) for i in range(50)]
raise NotImplementedError
@staticmethod
def task(worker_args: Tuple[int, AutoProxy, Any]) -> Any:
"""The concurrent task. The main results can be outputted using the
`output_queue.put`. Simpler or secundary outputs can be returned.
Args:
worker_args (Tuple): Worker id, Queue and Task arguments.
Returns:
Any: Return a secondary result.
"""
worker_id: int = worker_args[0]
output_queue: AutoProxy = worker_args[1]
task_args: Any = worker_args[2]
# Example:
# import random
# import time
# sleep = random.randint(1, 10)
# time.sleep(sleep)
# result = {'worker_id': worker_id, 'args': args, 'sleep': sleep}
# output_queue.put(result)
# return sleep
raise NotImplementedError
class Writer(Process):
def __init__(self, output_filepath: str, queue: AutoProxy) -> None:
"""Consolidate the results into a single output file.
Args:
output_filepath (str): File with the run results.
queue (AutoProxy): A WorkersPool queue with the workers results.
"""
super().__init__()
self.output_filepath = output_filepath
self.queue = queue
self.writed_results = 0
def before_write(self) -> None:
"""It runs before writing any result."""
# Example:
# print('Waiting for write')
# with open(self.output_filepath, 'w') as f:
# f.write(f'Starting\n')
raise NotImplementedError
def after_write(self) -> None:
"""It runs after writing all the results."""
# Example:
# print('Queue ended')
# with open(self.output_filepath, 'a') as f:
# f.write(f'Ending\n')
raise NotImplementedError
def write_result(self, result: Any) -> None:
"""Write a single result instance.
Args:
result (Any): A worker result.
"""
# Example:
# with open(self.output_filepath, 'a') as f:
# f.write(f'{result}\n')
raise NotImplementedError
def run(self) -> None:
"""The writing loop. It waits for new results, and then it writes them
into the output file. If the queue pops an `END_OF_QUEUE,` it stops
writing.
"""
self.before_write()
while True:
result = self.queue.get()
if result is END_OF_QUEUE:
break
self.writed_results += 1
print(f'Writing result {self.writed_results}')
self.write_result(result)
del result
self.after_write()
def main(output_filepath: str, processes: Optional[int] = None) -> List[Any]:
workers_pool = WorkersPool(processes)
writer = Writer(output_filepath, queue=workers_pool.output_queue)
writer.start()
print('Pooling tasks')
# Write your custom args
run_args = None
workers_pool.run_async(run_args)
results = workers_pool.notify_finish()
print('Waiting writer')
writer.join()
print('Done!')
return results
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Run Multi-Producer/Single Consumer job.')
parser.add_argument('--processes', type=int, default=None,
help='Number of processes for producers.')
parser.add_argument('--output_filepath', type=str,
help='Consumer file to write the producers` results.')
args = parser.parse_args()
results = main(args.output_filepath, args.processes)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment