Skip to content

Instantly share code, notes, and snippets.

@smurfix
Forked from vxgmichel/compat.py
Last active July 27, 2018 01:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save smurfix/aa047536aa8bc0f15f3eb561610e10fb to your computer and use it in GitHub Desktop.
Save smurfix/aa047536aa8bc0f15f3eb561610e10fb to your computer and use it in GitHub Desktop.
Asyncio/Curio/Trio compatibility module
import asyncio
try:
import curio
except ImportError:
curio = None
try:
import trio
except ImportError:
trio = None
async def get_async_module():
try:
if trio and trio.hazmat.current_task():
try:
if asyncio and asyncio.get_event_loop():
return asyncio
except RuntimeError:
pass
return trio
except RuntimeError:
pass
try:
if curio and (await curio.current_task()):
return curio
except RuntimeError:
pass
try:
if asyncio.get_event_loop().is_running():
return asyncio
except RuntimeError:
pass
raise RuntimeError(
'The current asynchronous framework is not supported')
# Framework agnostic sleep
async def sleep(seconds):
module = await get_async_module()
return await module.sleep(seconds)
# Testing
def test_trio():
trio.run(sleep, 1)
def test_curio():
curio.run(sleep(1))
def test_asyncio():
loop = asyncio.get_event_loop()
loop.run_until_complete(sleep(1))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment