Skip to content

Instantly share code, notes, and snippets.

@schlamar
Last active December 19, 2015 19:49
Show Gist options
  • Save schlamar/6009112 to your computer and use it in GitHub Desktop.
Save schlamar/6009112 to your computer and use it in GitHub Desktop.
Tornado's IOStream on top of tulip's Transport/Protocol API.
import re
import tulip
import tulipiostream
loop = tulip.get_event_loop()
LENGTH_PATTERN = re.compile(b'Content-Length: (\d*)\\r\\n')
REQ = b'''GET / HTTP/1.1
Host: 127.0.0.1:8000
Accept: text/html
Accept-Encoding: gzip,deflate,sdch
Accept-Language: de-DE
\r\n\r\n'''
def parse_content_length(header):
# don't use this for serious code :)
for match in LENGTH_PATTERN.finditer(header):
return int(match.group(1))
@tulip.coroutine
def main():
conn_f = loop.create_connection(tulipiostream.AsyncTulipIOStream,
'127.0.0.1', 8000)
_, stream = yield from conn_f
stream.write(REQ)
header = yield from stream.read_until(b'\r\n\r\n')
print (header)
length = parse_content_length(header)
if length:
body_f = stream.read_bytes(length)
else:
body_f = stream.read_until_close()
body = yield from body_f
if length:
assert len(body) == length
print (body)
if __name__ == '__main__':
loop.run_until_complete(main())
import collections
import functools
import numbers
import re
import sys
import tulip
from tulip.log import tulip_log
class TulipIOStream(object):
"""A Tulip protocol class to write to and read from a tulip transport.
We support a non-blocking ``write()`` and a family of ``read_*()`` methods.
All of the methods take callbacks (since writing and reading are
non-blocking and asynchronous).
When a transport is closed due to an error, the IOStream's ``error``
attribute contains the exception object.
"""
def __init__(self, loop=None, max_buffer_size=None):
self.loop = loop or tulip.get_event_loop()
self.max_buffer_size = max_buffer_size or 104857600
self.transport = None
self.error = None
self._read_buffer = collections.deque()
self._read_buffer_size = 0
self._read_delimiter = None
self._read_regex = None
self._read_bytes = None
self._read_until_close = False
self._read_callback = None
self._streaming_callback = None
self._close_callback = None
self._closed = False
def connection_made(self, transport):
self.transport = transport
def connection_lost(self, exc):
self.close(exc)
def eof_received(self):
self.transport.close()
def data_received(self, chunk):
self._read_buffer.append(chunk)
self._read_buffer_size += len(chunk)
if self._read_buffer_size >= self.max_buffer_size:
tulip_log.error("Reached maximum read buffer size")
self.close()
raise IOError("Reached maximum read buffer size")
self._read_from_buffer()
def read_until_regex(self, regex, callback):
"""Run ``callback`` when we read the given regex pattern.
The callback will get the data read (including the data that
matched the regex and anything that came before it) as an argument.
"""
self._set_read_callback(callback)
self._read_regex = re.compile(regex)
self._try_inline_read()
def read_until(self, delimiter, callback):
"""Run ``callback`` when we read the given delimiter.
The callback will get the data read (including the delimiter)
as an argument.
"""
self._set_read_callback(callback)
self._read_delimiter = delimiter
self._try_inline_read()
def read_bytes(self, num_bytes, callback, streaming_callback=None):
"""Run callback when we read the given number of bytes.
If a ``streaming_callback`` is given, it will be called with chunks
of data as they become available, and the argument to the final
``callback`` will be empty. Otherwise, the ``callback`` gets
the data as an argument.
"""
self._set_read_callback(callback)
assert isinstance(num_bytes, numbers.Integral)
self._read_bytes = num_bytes
self._streaming_callback = streaming_callback
self._try_inline_read()
def read_until_close(self, callback, streaming_callback=None):
"""Reads all data from the socket until it is closed.
If a ``streaming_callback`` is given, it will be called with chunks
of data as they become available, and the argument to the final
``callback`` will be empty. Otherwise, the ``callback`` gets the
data as an argument.
Subject to ``max_buffer_size`` limit from `IOStream` constructor if
a ``streaming_callback`` is not used.
"""
self._set_read_callback(callback)
self._streaming_callback = streaming_callback
if self.closed():
if self._streaming_callback is not None:
self._run_callback(self._streaming_callback,
self._consume(self._read_buffer_size))
self._run_callback(self._read_callback,
self._consume(self._read_buffer_size))
self._streaming_callback = None
self._read_callback = None
return
self._read_until_close = True
self._streaming_callback = streaming_callback
self._try_inline_read()
def write(self, data, callback=None):
"""Write the given data to this stream.
If ``callback`` is given, we call it when all of the buffered write
data has been successfully written to the stream. If there was
previously buffered write data and an old write callback, that
callback is simply overwritten with this new callback.
"""
self.transport.write(data)
def set_close_callback(self, callback):
"""Call the given callback when the stream is closed."""
self._close_callback = callback
def close(self, exc_info=False):
if not self.closed():
if exc_info:
if not isinstance(exc_info, tuple):
exc_info = sys.exc_info()
if any(exc_info):
self.error = exc_info[1]
if self._read_until_close:
if (self._streaming_callback is not None and
self._read_buffer_size):
self._run_callback(self._streaming_callback,
self._consume(self._read_buffer_size))
callback = self._read_callback
self._read_callback = None
self._read_until_close = False
self._run_callback(callback,
self._consume(self._read_buffer_size))
self._closed = True
if self._close_callback:
cb = self._close_callback
self._close_callback = None
self._run_callback(cb)
# Delete any unfinished callbacks to break up reference cycles.
self._read_callback = None
def closed(self):
"""Returns true if the stream has been closed."""
return self._closed
def _run_callback(self, callback, *args):
self.loop.call_soon(callback, *args)
def _set_read_callback(self, callback):
assert not self._read_callback, "Already reading"
self._read_callback = callback
def _try_inline_read(self):
"""Attempt to complete the current read operation from buffered data.
If the read can be completed without blocking, schedules the
read callback on the next IOLoop iteration; otherwise starts
listening for reads on the socket.
"""
# See if we've already got the data from a previous read
if self._read_from_buffer():
return
def _read_from_buffer(self):
"""Attempts to complete the currently-pending read from the buffer.
Returns True if the read was completed.
"""
if self._streaming_callback is not None and self._read_buffer_size:
bytes_to_consume = self._read_buffer_size
if self._read_bytes is not None:
bytes_to_consume = min(self._read_bytes, bytes_to_consume)
self._read_bytes -= bytes_to_consume
self._run_callback(self._streaming_callback,
self._consume(bytes_to_consume))
if self._read_bytes is not None and \
self._read_buffer_size >= self._read_bytes:
num_bytes = self._read_bytes
callback = self._read_callback
self._read_callback = None
self._streaming_callback = None
self._read_bytes = None
self._run_callback(callback, self._consume(num_bytes))
return True
elif self._read_delimiter is not None:
# Multi-byte delimiters (e.g. '\r\n') may straddle two
# chunks in the read buffer, so we can't easily find them
# without collapsing the buffer. However, since protocols
# using delimited reads (as opposed to reads of a known
# length) tend to be "line" oriented, the delimiter is likely
# to be in the first few chunks. Merge the buffer gradually
# since large merges are relatively expensive and get undone in
# consume().
if self._read_buffer:
while True:
loc = self._read_buffer[0].find(self._read_delimiter)
if loc != -1:
callback = self._read_callback
delimiter_len = len(self._read_delimiter)
self._read_callback = None
self._streaming_callback = None
self._read_delimiter = None
self._run_callback(callback,
self._consume(loc + delimiter_len))
return True
if len(self._read_buffer) == 1:
break
_double_prefix(self._read_buffer)
elif self._read_regex is not None:
if self._read_buffer:
while True:
m = self._read_regex.search(self._read_buffer[0])
if m is not None:
callback = self._read_callback
self._read_callback = None
self._streaming_callback = None
self._read_regex = None
self._run_callback(callback, self._consume(m.end()))
return True
if len(self._read_buffer) == 1:
break
_double_prefix(self._read_buffer)
return False
def _consume(self, loc):
if loc == 0:
return b""
_merge_prefix(self._read_buffer, loc)
self._read_buffer_size -= loc
return self._read_buffer.popleft()
def async_wrapper(func):
@functools.wraps(func)
def wrapper(stream, *args):
f = tulip.Future()
func(stream, *args, callback=f.set_result)
stream.set_close_callback(f.cancel)
return f
return wrapper
class AsyncTulipIOStream(TulipIOStream):
read_until_regex = async_wrapper(TulipIOStream.read_until_regex)
read_until = async_wrapper(TulipIOStream.read_until)
read_bytes = async_wrapper(TulipIOStream.read_bytes)
read_until_close = async_wrapper(TulipIOStream.read_until_close)
def _double_prefix(deque):
"""Grow by doubling, but don't split the second chunk just because the
first one is small.
"""
new_len = max(len(deque[0]) * 2,
(len(deque[0]) + len(deque[1])))
_merge_prefix(deque, new_len)
def _merge_prefix(deque, size):
"""Replace the first entries in a deque of strings with a single
string of up to size bytes.
>>> d = collections.deque(['abc', 'de', 'fghi', 'j'])
>>> _merge_prefix(d, 5); print(d)
deque(['abcde', 'fghi', 'j'])
Strings will be split as necessary to reach the desired size.
>>> _merge_prefix(d, 7); print(d)
deque(['abcdefg', 'hi', 'j'])
>>> _merge_prefix(d, 3); print(d)
deque(['abc', 'defg', 'hi', 'j'])
>>> _merge_prefix(d, 100); print(d)
deque(['abcdefghij'])
"""
if len(deque) == 1 and len(deque[0]) <= size:
return
prefix = []
remaining = size
while deque and remaining > 0:
chunk = deque.popleft()
if len(chunk) > remaining:
deque.appendleft(chunk[remaining:])
chunk = chunk[:remaining]
prefix.append(chunk)
remaining -= len(chunk)
# This data structure normally just contains byte strings, but
# the unittest gets messy if it doesn't use the default str() type,
# so do the merge based on the type of data that's actually present.
if prefix:
deque.appendleft(type(prefix[0])().join(prefix))
if not deque:
deque.appendleft(b"")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment