Skip to content

Instantly share code, notes, and snippets.

@progrium
Last active February 18, 2016 13:36
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save progrium/80043656c723e9e10584 to your computer and use it in GitHub Desktop.
Save progrium/80043656c723e9e10584 to your computer and use it in GitHub Desktop.
11 lines that makes interactively using asyncio libraries nice again
$ python3 -i shellasync.py
>>> import websockets
>>> websockets.connect("ws://localhost:8000/echo")
<websockets.client.WebSocketClientProtocol object at 0x1041a34e0>
>>> ws = _ # only caveat is assignment has to be done after
>>> ws.send("Hello world")
None
>>> ws.recv()
'Hello world'
>>>
>>> @coroutine
... def echo_again():
... yield from ws.send("Hello again")
... reply = yield from ws.recv()
... return reply
...
>>> echo_again()
'Hello again'
>>>
>>> sleep(1) # yields to background tasks (servers, etc)
import sys
import pprint
from asyncio import *
def asynchook(v):
import builtins
if iscoroutine(v):
builtins._ = get_event_loop().run_until_complete(v)
pprint.pprint(builtins._)
else:
sys.__displayhook__(v)
sys.displayhook = asynchook
$ python3
>>> import asyncio
>>> import websockets
>>> loop = asyncio.get_event_loop()
>>> co = websockets.connect("ws://localhost:8000/echo")
>>> ws = loop.run_until_complete(co)
>>> loop.run_until_complete(ws.send("Hello world"))
None
>>> loop.run_until_complete(ws.recv())
'Hello world'
>>>
>>> @asyncio.coroutine
... def echo_again():
... yield from ws.send("Hello again")
... reply = yield from ws.recv()
... return reply
...
>>> loop.run_until_complete(echo_again())
'Hello again'
>>>
>>> asyncio.sleep(1) # yields to background tasks (servers, etc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment