Skip to content

Instantly share code, notes, and snippets.

@jmbjorndalen
Created April 26, 2018 08:10
Show Gist options
  • Save jmbjorndalen/e1cbd93c475792c83f79ef475345ed00 to your computer and use it in GitHub Desktop.
Save jmbjorndalen/e1cbd93c475792c83f79ef475345ed00 to your computer and use it in GitHub Desktop.
Combining Python 3 asyncio coroutines with thread pool and process pool executors
#!/usr/bin/env python3
# Combining coroutines running in an asyncio event loop with
# blocking tasks in thread pool and process pool executors.
#
# Based on https://pymotw.com/3/asyncio/executors.html, but this version runs both
# threads and processes at the same time and interleaves them with asyncio coroutines.
#
# All appears to be working.
#
import asyncio
import concurrent.futures
import logging
import sys
import time
def block_task(prefix, n):
"""A blocking task to be executed in a thread or process executor"""
log = logging.getLogger(f'{prefix}_blocks({n})')
log.info('running')
time.sleep(1.0)
log.info('done')
return f'b{n ** 2}'
async def async_task(prefix, n):
"""A coroutine intended to run in the asyncio event loop to verify that it
works concurrently with the blocking tasks"""
log = logging.getLogger(f'{prefix}_asyncio({n})')
for i in range(5):
log.info(f'running {i}')
await asyncio.sleep(0.5)
log.info('done')
return f'a{n ** 2}'
async def run_tasks(prefix, executor):
"""Runs blocking tasks in the executor and spawns off a few coroutines to run
concurrently with the blocking tasks."""
log = logging.getLogger(f'{prefix}_run_blocking_tasks')
log.info('starting')
log.info('creating executor tasks')
loop = asyncio.get_event_loop()
blocking_tasks = [
loop.run_in_executor(executor, block_task, prefix, i)
for i in range(6)
] + [async_task(prefix, i) for i in range(3)]
log.info('waiting for executor tasks')
completed, pending = await asyncio.wait(blocking_tasks)
results = [t.result() for t in completed]
log.info('results: {!r}'.format(results))
log.info('exiting')
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format='PID %(process)5s %(threadName)-25s %(name)-25s: %(message)s',
stream=sys.stderr,
)
th_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
pr_executor = concurrent.futures.ProcessPoolExecutor(max_workers=3)
event_loop = asyncio.get_event_loop()
try:
w = asyncio.wait([run_tasks('th', th_executor),
run_tasks('pr', pr_executor)
])
event_loop.run_until_complete(w)
finally:
event_loop.close()
@mrinalkamboj
Copy link

In this code, in case I create a thread pool executor to be executed inside the Async method without using the asyncio.get_event_loop(), will that be correct. I see that too executing using thread pool just that I have use map or submit methods to coordinate the results of the loing running tasks, please check following code, is this correct, https://ideone.com/lDVLFh

@jmbjorndalen
Copy link
Author

jmbjorndalen commented Mar 26, 2021

The get_event_loop() call is there to get access to the loop's run_in_executor() method. The idea is to be able to ship blocking or time-consuming tasks over to thread or process pools without blocking any of the other asyncio tasks. The code that you're linking to will, as far as I understand it, not yield back control to the event loop while you wait for the futures to complete. You can test this by adding the following async function to your code:

async def async_task():
    for i in range(10):
        print(f"async running {i}")
        await asyncio.sleep(0.2)
    print("async done")
    return "I'm done"

And modifying the run statement to run both the ExecuteSleep and async_task at the same time:

    results  = asyncio.run(asyncio.wait([
        async_task(),
        ExecuteSleep()
        ]))

What you will probably observere here is that async_task gets to print one 'running' line before ExecuteSleep takes over, and that async_task has to wait for ExecuteSleep to finish before continuing the loop. I have verified that this works as I would expect in Python 3.8 and 3.9 at least.

It doesn't mean that the code in your link is wrong, it just doesn't do what I tried to do in this gist.

Have I understood your question correctly?

@mrinalkamboj
Copy link

mrinalkamboj commented Apr 9, 2021

Thanks for taking time out to reply, somehow there's no way to get the notification, therefore couldn't check earlier.
I have understood your view and can get it why it would be blocking. I have modified the ideone code, however one more question as you can see, following line is commented, x,y = asyncio.run(asyncio.wait(tasks)), since I am not able to fetch the result using the code you have provided. It infact unwraps result of both functions in a task array in the variable x, nothing in variable y. Instead the following one helps in easy unwrapping results of the two functions in two varibles, x,y = loop.run_until_complete(asyncio.gather(*tasks)).

Do you have a way for me to unwrap result using the example you have provided.

@jmbjorndalen
Copy link
Author

asyncio.wait has an optional timeout parameter, so all of the coroutines may not have completed when it returns. If you check the return value it returns two lists (done and pending).
If you don't want the timeout, it's probably better to use the gather (as in your example).

@mrinalkamboj
Copy link

Understood, thank you for help and clarity, your code shows complete,pending option

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment