Skip to content

Instantly share code, notes, and snippets.

@bergundy
Last active December 19, 2015 06:49
Show Gist options
  • Save bergundy/5914398 to your computer and use it in GitHub Desktop.
Save bergundy/5914398 to your computer and use it in GitHub Desktop.
An example of how to use coroutines with IOStream.read and timeouts
import sys
import functools
import termios
import socket
from tornado import gen, ioloop, iostream
from datetime import timedelta
class StdinReader(object):
def __init__(self):
fd = sys.stdin.fileno()
# Set stdin to non-cannonical and no echo mode
# so we can read a byte at a time
term = termios.tcgetattr(fd)
term[3] &= ~termios.ICANON & ~termios.ECHO
termios.tcsetattr(fd, termios.TCSANOW, term)
self.stream = iostream.PipeIOStream(fd)
@gen.coroutine
def read(self, num_bytes, timeout=None):
ioloop_timeout = None
if timeout:
def on_timeout():
# We now have a scheduled read which is not deliverable.
# I chose to ignore this for simplicity.
raise socket.timeout
ioloop_timeout = ioloop.IOLoop.current().add_timeout(timeout, callback=on_timeout)
data = yield gen.Task(self.stream.read_bytes, num_bytes)
if ioloop_timeout is not None:
ioloop.IOLoop.current().remove_timeout(ioloop_timeout)
raise gen.Return(data)
if __name__ == "__main__":
r = StdinReader()
read = functools.partial(r.read, num_bytes=1, timeout=timedelta(seconds=1))
print ioloop.IOLoop.current().run_sync(read)
@gen.coroutine
def read3():
for x in range(3):
char = yield read()
print char
ioloop.IOLoop.current().run_sync(read3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment