Skip to content

Instantly share code, notes, and snippets.

@iedmrc
Forked from phizaz/async.py
Last active August 21, 2020 08:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save iedmrc/2fbddeb8ca8df25356d8acc3d297e955 to your computer and use it in GitHub Desktop.
Save iedmrc/2fbddeb8ca8df25356d8acc3d297e955 to your computer and use it in GitHub Desktop.
Python turn sync functions to async (and async to sync)
import functools
import asyncio
from concurrent.futures import ThreadPoolExecutor
def to_async(func):
"""Turns a sync function to async function using event loop"""
@functools.wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pfunc = functools.partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run
def to_async_thread(fn):
"""Turns a sync function to async function using threads"""
pool = ThreadPoolExecutor()
@functools.wraps(fn)
def wrapper(*args, **kwargs):
future = pool.submit(fn, *args, **kwargs)
return asyncio.wrap_future(future) # make it awaitable
return wrapper
def to_sync(fn):
"""Turns an async function to sync function"""
@functools.wraps(fn)
def wrapper(*args, **kwargs):
res = fn(*args, **kwargs)
if asyncio.iscoroutine(res):
return asyncio.get_event_loop().run_until_complete(res)
return res
return wrapper
import functools
def force_async(fn):
'''
turns a sync function to async function using threads
'''
from concurrent.futures import ThreadPoolExecutor
import asyncio
pool = ThreadPoolExecutor()
@functools.wraps(fn)
def wrapper(*args, **kwargs):
future = pool.submit(fn, *args, **kwargs)
return asyncio.wrap_future(future) # make it awaitable
return wrapper
def force_sync(fn):
'''
turn an async function to sync function
'''
import asyncio
@functools.wraps(fn)
def wrapper(*args, **kwargs):
res = fn(*args, **kwargs)
if asyncio.iscoroutine(res):
return asyncio.get_event_loop().run_until_complete(res)
return res
return wrapper
import unittest
from async import *
class AsyncCanTurnAsyncIntoSyncFunction(unittest.TestCase):
def test_turn_async_to_sync(self):
@force_sync
async def test():
import asyncio
await asyncio.sleep(0.1)
return 1 + 2
self.assertEqual(test(), 3)
def test_turn_sync_to_sync(self):
@force_sync
def test():
return 1 + 2
self.assertEqual(test(), 3)
class AsyncCanTurnSyncIntoAsyncFunction(unittest.TestCase):
def test_turn_sync_to_async(self):
@force_async
def test():
import time
time.sleep(1)
return True
@force_sync
async def main():
import asyncio
# if it were to execute sequentially, it would take 10 seconds, in this case we expect to see only 1 second
futures = list(map(lambda x: test(),
range(10)))
return await asyncio.gather(*futures)
import time
# take the elapsed time
start = time.time()
res = main()
end = time.time()
elapsed = end - start
self.assertEqual(len(res), 10)
self.assertLess(elapsed, 1.2) # a little more than 1 second is normal
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment