Skip to content

Instantly share code, notes, and snippets.

@antibagr
Created October 7, 2021 00:55
Show Gist options
  • Save antibagr/a0617476e2f7ca0623475044aafc8e94 to your computer and use it in GitHub Desktop.
Save antibagr/a0617476e2f7ca0623475044aafc8e94 to your computer and use it in GitHub Desktop.
Asyncio with threads explained
import time
import asyncio
import threading
async def coro(name: str) -> str:
await asyncio.sleep(0.001)
print(f'[Coroutine] Called with {name=}')
return name
def call_coro_from_thread() -> None:
print('[Second] I\'m going to call a coroutine in the first thread now!')
asyncio.run_coroutine_threadsafe(coro('foo'), loop=loop)
print('[Second] Done. Closing second thread.')
async def main() -> None:
for _ in range(3):
print('[First] Event loop is running')
await asyncio.sleep(0.1)
print('[First] Closing event loop and first thread')
loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
threads: list[threading.Thread] = [
threading.Thread(target=loop.run_until_complete, args=(main(),)),
threading.Thread(target=call_coro_from_thread),
]
print('[Main] Hello. I\'m going to launch all threads.')
for th in threads:
th.start()
print('[Main] thread is here and it is still free!')
time.sleep(0.3)
print('[Main] Closing all threads.')
for th in threads:
th.join()
print('[Main] Goodbye!')
@antibagr
Copy link
Author

antibagr commented Oct 7, 2021

You would see:

$ python asyncio_with_threads.py
[Main] Hello. I'm going to launch all threads.
[First] Event loop is running
[Second] I'm going to call a coroutine in the first thread now!
[Main] thread is here and it is still free!
[Second] Done. Closing second thread.
[Coroutine] Called with name='foo'
[First] Event loop is running
[First] Event loop is running
[Main] Closing all threads.
[First] Closing event loop and the first thread
[Main] Goodbye!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment