Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save ArtyomKozyrev8/3684acf1cb102823604efeb65605e016 to your computer and use it in GitHub Desktop.
Save ArtyomKozyrev8/3684acf1cb102823604efeb65605e016 to your computer and use it in GitHub Desktop.
call of children processes in child process of the process with asyncio app inside
import asyncio
from concurrent.futures import ProcessPoolExecutor
from typing import Callable, List
from time import sleep, monotonic
from random import randint
from functools import partial
TASKS_NUMBER = 9
def handle_heavy_task(task: int) -> int:
"""Imagine this is CPU heavy task"""
sleep(10)
return task ** 2
def heavy_tasker(tasks: List[int]) -> List[int]:
"""Distributes tasks among the child process children processes"""
with ProcessPoolExecutor(max_workers=4) as tasker_pool:
r = tasker_pool.map(handle_heavy_task, tasks)
return list(r)
async def worker(executor: ProcessPoolExecutor, func: Callable):
"""Async worker"""
loop = asyncio.get_event_loop()
r = await loop.run_in_executor(executor=executor, func=func)
return r
async def ticker():
"""Just to prove that it is async code"""
while True:
await asyncio.sleep(5)
print("Still working!")
async def amain():
"""Async main entrypoint of the code snippet"""
tasks = [
[randint(1, 10) for _ in range(4)] for _ in range(TASKS_NUMBER)
]
print(f"Tasks: {tasks}")
asyncio.create_task(ticker()) # will be never awaited
with ProcessPoolExecutor(max_workers=4) as pool:
results = await asyncio.gather(
*[worker(pool, partial(heavy_tasker, task)) for task in tasks]
)
print(f"Results: {results}")
if __name__ == '__main__':
start_t = monotonic()
asyncio.run(amain())
print(f"Duration: {monotonic() - start_t}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment