Skip to content

Instantly share code, notes, and snippets.

@jeanmonet
Last active September 26, 2020 21:37
Show Gist options
  • Save jeanmonet/80cc46be42c97292e810f8a8f71c9118 to your computer and use it in GitHub Desktop.
Save jeanmonet/80cc46be42c97292e810f8a8f71c9118 to your computer and use it in GitHub Desktop.
Convenience function to call (multiple) asynchronous functions from synchronous code (loop runs in separate thread and returns results to the main thread)
"""Call async functions from sync code.
This module provides the convenience sync2async function.
It runs a bunch of given coroutines (async functions) from sync code in one separate thread,
and returns results to the main thread.
sync2async function:
Argument:
iterable (such as list) of coroutines
Returns:
dictionary mapping coroutines to results
(each key is a coroutine, the value is the result of that coroutine)
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
from queue import SimpleQueue, Empty
from typing import Dict, Iterable, Coroutine, Any
async def _async_result_to_queue(coro: Coroutine, queue: SimpleQueue) -> None:
"""Await coroutine and put result to queue"""
res = await coro
queue.put((coro, res,))
async def _gather_coroutines(coroutines: Iterable[Coroutine], queue: SimpleQueue) -> None:
"""If multiple coroutines, use asyncio.gather to run the coroutines asynchroneously"""
await asyncio.gather(*(_async_result_to_queue(coro, queue) for coro in coroutines))
def _run_async(coroutines: Iterable[Coroutine], queue: SimpleQueue) -> None:
"""Run coroutines from sync code (puts results to SimpleQueue)"""
asyncio.run(_gather_coroutines(coroutines, queue))
def sync2async(coroutines: Iterable[Coroutine]) -> Dict[Coroutine, Any]:
"""Runs async coroutines (list given) in a separate thread and returns result (as dict)
Usage: sync2async( [coro1(args1), coro2(args2), ...] )
Returns dictionary with coroutine as key and value as result
res = d[coroutine]
Uses queue.SimpleQueue object to retrieve result obtaines in asynchroneous code
"""
queue = SimpleQueue()
with ThreadPoolExecutor(max_workers=1) as ex:
future = ex.submit(_run_async, coroutines, queue)
try:
# map tuple (coro, res) to dictionary d[coro] = res
return {k: v for k, v in [queue.get(block=False) for _ in range(queue.qsize())]}
except Empty:
raise RuntimeError(
'sync2async error: could not obtain result from coroutine (queue empty)') from None
if __name__ == '__main__':
# Testing functionnality
async def some_async_task(n):
"""Some async function to test"""
print('Task running with n =', n)
await asyncio.sleep(n/5)
print('Inside coro', n)
await asyncio.sleep(n/5)
print('Done', n)
return list(range(n))
coro3 = some_async_task(30)
coro1 = some_async_task(10)
coro2 = some_async_task(20)
results = sync2async(
(coro3, coro1, coro2)
)
print(results[coro1])
print(results[coro2])
print(results[coro3])
# OUTPUT showing async tasks being run asynchroneously:
# Task running with n = 30
# Task running with n = 10
# Task running with n = 20
# Inside coro 10
# Inside coro 20
# Done 10
# Inside coro 30
# Done 20
# Done 30
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment