Last active
September 26, 2020 21:37
-
-
Save jeanmonet/80cc46be42c97292e810f8a8f71c9118 to your computer and use it in GitHub Desktop.
Convenience function to call (multiple) asynchronous functions from synchronous code (loop runs in separate thread and returns results to the main thread)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Call async functions from sync code. | |
This module provides the convenience sync2async function. | |
It runs a bunch of given coroutines (async functions) from sync code in one separate thread, | |
and returns results to the main thread. | |
sync2async function: | |
Argument: | |
iterable (such as list) of coroutines | |
Returns: | |
dictionary mapping coroutines to results | |
(each key is a coroutine, the value is the result of that coroutine) | |
""" | |
import asyncio | |
from concurrent.futures import ThreadPoolExecutor | |
from queue import SimpleQueue, Empty | |
from typing import Dict, Iterable, Coroutine, Any | |
async def _async_result_to_queue(coro: Coroutine, queue: SimpleQueue) -> None: | |
"""Await coroutine and put result to queue""" | |
res = await coro | |
queue.put((coro, res,)) | |
async def _gather_coroutines(coroutines: Iterable[Coroutine], queue: SimpleQueue) -> None: | |
"""If multiple coroutines, use asyncio.gather to run the coroutines asynchroneously""" | |
await asyncio.gather(*(_async_result_to_queue(coro, queue) for coro in coroutines)) | |
def _run_async(coroutines: Iterable[Coroutine], queue: SimpleQueue) -> None: | |
"""Run coroutines from sync code (puts results to SimpleQueue)""" | |
asyncio.run(_gather_coroutines(coroutines, queue)) | |
def sync2async(coroutines: Iterable[Coroutine]) -> Dict[Coroutine, Any]: | |
"""Runs async coroutines (list given) in a separate thread and returns result (as dict) | |
Usage: sync2async( [coro1(args1), coro2(args2), ...] ) | |
Returns dictionary with coroutine as key and value as result | |
res = d[coroutine] | |
Uses queue.SimpleQueue object to retrieve result obtaines in asynchroneous code | |
""" | |
queue = SimpleQueue() | |
with ThreadPoolExecutor(max_workers=1) as ex: | |
future = ex.submit(_run_async, coroutines, queue) | |
try: | |
# map tuple (coro, res) to dictionary d[coro] = res | |
return {k: v for k, v in [queue.get(block=False) for _ in range(queue.qsize())]} | |
except Empty: | |
raise RuntimeError( | |
'sync2async error: could not obtain result from coroutine (queue empty)') from None | |
if __name__ == '__main__': | |
# Testing functionnality | |
async def some_async_task(n): | |
"""Some async function to test""" | |
print('Task running with n =', n) | |
await asyncio.sleep(n/5) | |
print('Inside coro', n) | |
await asyncio.sleep(n/5) | |
print('Done', n) | |
return list(range(n)) | |
coro3 = some_async_task(30) | |
coro1 = some_async_task(10) | |
coro2 = some_async_task(20) | |
results = sync2async( | |
(coro3, coro1, coro2) | |
) | |
print(results[coro1]) | |
print(results[coro2]) | |
print(results[coro3]) | |
# OUTPUT showing async tasks being run asynchroneously: | |
# Task running with n = 30 | |
# Task running with n = 10 | |
# Task running with n = 20 | |
# Inside coro 10 | |
# Inside coro 20 | |
# Done 10 | |
# Inside coro 30 | |
# Done 20 | |
# Done 30 | |
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] | |
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] | |
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment