Created
September 2, 2021 08:21
-
-
Save lihuanshuai/1fa709a10ee884ff08f851f187f6cd5d to your computer and use it in GitHub Desktop.
Code to test asyncio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import inspect | |
async def test_coro(): | |
await asyncio.sleep(1) | |
print('coro') | |
return 1 | |
def test_sync(): | |
print('sync') | |
return 2 | |
async def run(func, *args, **kwargs): | |
print('run', *args, **kwargs) | |
r = func(*args, **kwargs) | |
if not inspect.isawaitable(r): | |
return r | |
task = asyncio.create_task(r) | |
await task | |
return task.result() | |
async def test_async(): | |
print('async') | |
r = await run(test_sync) | |
print(r) | |
r = await run(test_coro) | |
print(r) | |
def main(): | |
asyncio.get_event_loop().run_until_complete(test_async()) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment