Skip to content

Instantly share code, notes, and snippets.

@freol35241
Created March 21, 2024 12:51
Show Gist options
  • Save freol35241/177f79e90e17763385a7dac390b90a3b to your computer and use it in GitHub Desktop.
Save freol35241/177f79e90e17763385a7dac390b90a3b to your computer and use it in GitHub Desktop.
Multiprocessing with multiprocess library
from typing import Tuple, Callable, Dict, Any, Hashable, List
from multiprocess.pool import Pool
from dataclasses import dataclass, field
from uuid import uuid4
@dataclass
class Task:
func: Callable
args: Tuple = ()
kwargs: Dict = field(default_factory=lambda: {})
def run_in_process_pool(
tasks: Dict[Hashable, Task],
no_of_processes: int | None = None,
context: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
total_no_of_tasks = len(tasks)
ctx_id = uuid4()
with tqdm(total=total_no_of_tasks) as progess_bar:
def _initializer():
# Create the globals
if context is not None:
globals()[ctx_id] = context
print(f"Process initialized successfully!")
def _wrap_task(task: Task):
def _wrapped_task():
context = globals()[ctx_id]
return task.func(*task.args, **task.kwargs, context=context)
return _wrapped_task
def _progress_callback(*args, **kwargs):
progess_bar.update(1)
def _error_callback(the_error: Exception):
import traceback
traceback.print_exception(the_error)
print()
print("Starting sub-processes...")
with Pool(no_of_processes, initializer=_initializer) as pool:
apply_results = {
key: pool.apply_async(
_wrap_task(task),
callback=_progress_callback,
error_callback=_error_callback,
)
for key, task in tasks.items()
}
while not all(
[apply_result.ready() for apply_result in apply_results.values()]
):
time.sleep(1)
results = {
key: apply_result.get() for key, apply_result in apply_results.items()
}
progess_bar.close()
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment