Skip to content

Instantly share code, notes, and snippets.

@ashu-tosh-kumar
Created July 31, 2023 15:26
Show Gist options
  • Save ashu-tosh-kumar/2cf677aa2ce287c6695c119da858b477 to your computer and use it in GitHub Desktop.
Save ashu-tosh-kumar/2cf677aa2ce287c6695c119da858b477 to your computer and use it in GitHub Desktop.
Call a synchronous function asynchronously in Python
# Assuming asyncio event loop. Import respective library if using any other event loop library
import asyncio
import functools
import time
from concurrent.futures.process import ProcessPoolExecutor
# Create a process pool executor to run sync function in awaitable process
executor = ProcessPoolExecutor()
# synchronous function
def sync_func() -> str:
print("sync function: sleeping")
time.sleep(10)
return "sync"
# asynchronous function
async def async_func() -> str:
print("async function")
return "async"
# sync_in_async allows running sync code asynchronously
async def sync_in_async(f, *args, **kwargs):
# Get running event loop. Following code is specific to asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(executor, functools.partial(f, *args, **kwargs))
# CODE FOR TESTING ABOVE CODE
# Running this code should call the async function without waiting for the sync function to complete
async def main():
# calling sync function asynchronously
sync_task = asyncio.create_task(sync_in_async(sync_func))
# calling async function asynchronously
async_task = asyncio.create_task(async_func())
sync_val = await sync_task
async_val = await async_task
print("sync function return value: ", sync_val)
print("async function return value: ", async_val)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment