Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
Created July 26, 2018 21:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save vxgmichel/6710f4ae819a74224425616f491e032b to your computer and use it in GitHub Desktop.
Save vxgmichel/6710f4ae819a74224425616f491e032b to your computer and use it in GitHub Desktop.
Asyncio/Curio/Trio compatibility module
import asyncio
try:
import curio
except ImportError:
curio = None
try:
import trio
except ImportError:
trio = None
async def get_async_module():
try:
if trio and trio.hazmat.current_task():
return trio
except RuntimeError:
pass
try:
if curio and (await curio.current_task()):
return curio
except RuntimeError:
pass
try:
if asyncio.get_event_loop().is_running():
return asyncio
except RuntimeError:
pass
raise RuntimeError(
'The current asynchronous framework is not supported')
# Framework agnostic sleep
async def sleep(seconds):
module = await get_async_module()
return await module.sleep(seconds)
# Testing
def test_trio():
trio.run(sleep, 1)
def test_curio():
curio.run(sleep(1))
def test_asyncio():
loop = asyncio.get_event_loop()
loop.run_until_complete(sleep(1))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment