Last active
December 19, 2015 19:49
-
-
Save schlamar/6009112 to your computer and use it in GitHub Desktop.
Tornado's IOStream on top of tulip's Transport/Protocol API.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import tulip | |
import tulipiostream | |
loop = tulip.get_event_loop() | |
LENGTH_PATTERN = re.compile(b'Content-Length: (\d*)\\r\\n') | |
REQ = b'''GET / HTTP/1.1 | |
Host: 127.0.0.1:8000 | |
Accept: text/html | |
Accept-Encoding: gzip,deflate,sdch | |
Accept-Language: de-DE | |
\r\n\r\n''' | |
def parse_content_length(header): | |
# don't use this for serious code :) | |
for match in LENGTH_PATTERN.finditer(header): | |
return int(match.group(1)) | |
@tulip.coroutine | |
def main(): | |
conn_f = loop.create_connection(tulipiostream.AsyncTulipIOStream, | |
'127.0.0.1', 8000) | |
_, stream = yield from conn_f | |
stream.write(REQ) | |
header = yield from stream.read_until(b'\r\n\r\n') | |
print (header) | |
length = parse_content_length(header) | |
if length: | |
body_f = stream.read_bytes(length) | |
else: | |
body_f = stream.read_until_close() | |
body = yield from body_f | |
if length: | |
assert len(body) == length | |
print (body) | |
if __name__ == '__main__': | |
loop.run_until_complete(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import functools | |
import numbers | |
import re | |
import sys | |
import tulip | |
from tulip.log import tulip_log | |
class TulipIOStream(object): | |
"""A Tulip protocol class to write to and read from a tulip transport. | |
We support a non-blocking ``write()`` and a family of ``read_*()`` methods. | |
All of the methods take callbacks (since writing and reading are | |
non-blocking and asynchronous). | |
When a transport is closed due to an error, the IOStream's ``error`` | |
attribute contains the exception object. | |
""" | |
def __init__(self, loop=None, max_buffer_size=None): | |
self.loop = loop or tulip.get_event_loop() | |
self.max_buffer_size = max_buffer_size or 104857600 | |
self.transport = None | |
self.error = None | |
self._read_buffer = collections.deque() | |
self._read_buffer_size = 0 | |
self._read_delimiter = None | |
self._read_regex = None | |
self._read_bytes = None | |
self._read_until_close = False | |
self._read_callback = None | |
self._streaming_callback = None | |
self._close_callback = None | |
self._closed = False | |
def connection_made(self, transport): | |
self.transport = transport | |
def connection_lost(self, exc): | |
self.close(exc) | |
def eof_received(self): | |
self.transport.close() | |
def data_received(self, chunk): | |
self._read_buffer.append(chunk) | |
self._read_buffer_size += len(chunk) | |
if self._read_buffer_size >= self.max_buffer_size: | |
tulip_log.error("Reached maximum read buffer size") | |
self.close() | |
raise IOError("Reached maximum read buffer size") | |
self._read_from_buffer() | |
def read_until_regex(self, regex, callback): | |
"""Run ``callback`` when we read the given regex pattern. | |
The callback will get the data read (including the data that | |
matched the regex and anything that came before it) as an argument. | |
""" | |
self._set_read_callback(callback) | |
self._read_regex = re.compile(regex) | |
self._try_inline_read() | |
def read_until(self, delimiter, callback): | |
"""Run ``callback`` when we read the given delimiter. | |
The callback will get the data read (including the delimiter) | |
as an argument. | |
""" | |
self._set_read_callback(callback) | |
self._read_delimiter = delimiter | |
self._try_inline_read() | |
def read_bytes(self, num_bytes, callback, streaming_callback=None): | |
"""Run callback when we read the given number of bytes. | |
If a ``streaming_callback`` is given, it will be called with chunks | |
of data as they become available, and the argument to the final | |
``callback`` will be empty. Otherwise, the ``callback`` gets | |
the data as an argument. | |
""" | |
self._set_read_callback(callback) | |
assert isinstance(num_bytes, numbers.Integral) | |
self._read_bytes = num_bytes | |
self._streaming_callback = streaming_callback | |
self._try_inline_read() | |
def read_until_close(self, callback, streaming_callback=None): | |
"""Reads all data from the socket until it is closed. | |
If a ``streaming_callback`` is given, it will be called with chunks | |
of data as they become available, and the argument to the final | |
``callback`` will be empty. Otherwise, the ``callback`` gets the | |
data as an argument. | |
Subject to ``max_buffer_size`` limit from `IOStream` constructor if | |
a ``streaming_callback`` is not used. | |
""" | |
self._set_read_callback(callback) | |
self._streaming_callback = streaming_callback | |
if self.closed(): | |
if self._streaming_callback is not None: | |
self._run_callback(self._streaming_callback, | |
self._consume(self._read_buffer_size)) | |
self._run_callback(self._read_callback, | |
self._consume(self._read_buffer_size)) | |
self._streaming_callback = None | |
self._read_callback = None | |
return | |
self._read_until_close = True | |
self._streaming_callback = streaming_callback | |
self._try_inline_read() | |
def write(self, data, callback=None): | |
"""Write the given data to this stream. | |
If ``callback`` is given, we call it when all of the buffered write | |
data has been successfully written to the stream. If there was | |
previously buffered write data and an old write callback, that | |
callback is simply overwritten with this new callback. | |
""" | |
self.transport.write(data) | |
def set_close_callback(self, callback): | |
"""Call the given callback when the stream is closed.""" | |
self._close_callback = callback | |
def close(self, exc_info=False): | |
if not self.closed(): | |
if exc_info: | |
if not isinstance(exc_info, tuple): | |
exc_info = sys.exc_info() | |
if any(exc_info): | |
self.error = exc_info[1] | |
if self._read_until_close: | |
if (self._streaming_callback is not None and | |
self._read_buffer_size): | |
self._run_callback(self._streaming_callback, | |
self._consume(self._read_buffer_size)) | |
callback = self._read_callback | |
self._read_callback = None | |
self._read_until_close = False | |
self._run_callback(callback, | |
self._consume(self._read_buffer_size)) | |
self._closed = True | |
if self._close_callback: | |
cb = self._close_callback | |
self._close_callback = None | |
self._run_callback(cb) | |
# Delete any unfinished callbacks to break up reference cycles. | |
self._read_callback = None | |
def closed(self): | |
"""Returns true if the stream has been closed.""" | |
return self._closed | |
def _run_callback(self, callback, *args): | |
self.loop.call_soon(callback, *args) | |
def _set_read_callback(self, callback): | |
assert not self._read_callback, "Already reading" | |
self._read_callback = callback | |
def _try_inline_read(self): | |
"""Attempt to complete the current read operation from buffered data. | |
If the read can be completed without blocking, schedules the | |
read callback on the next IOLoop iteration; otherwise starts | |
listening for reads on the socket. | |
""" | |
# See if we've already got the data from a previous read | |
if self._read_from_buffer(): | |
return | |
def _read_from_buffer(self): | |
"""Attempts to complete the currently-pending read from the buffer. | |
Returns True if the read was completed. | |
""" | |
if self._streaming_callback is not None and self._read_buffer_size: | |
bytes_to_consume = self._read_buffer_size | |
if self._read_bytes is not None: | |
bytes_to_consume = min(self._read_bytes, bytes_to_consume) | |
self._read_bytes -= bytes_to_consume | |
self._run_callback(self._streaming_callback, | |
self._consume(bytes_to_consume)) | |
if self._read_bytes is not None and \ | |
self._read_buffer_size >= self._read_bytes: | |
num_bytes = self._read_bytes | |
callback = self._read_callback | |
self._read_callback = None | |
self._streaming_callback = None | |
self._read_bytes = None | |
self._run_callback(callback, self._consume(num_bytes)) | |
return True | |
elif self._read_delimiter is not None: | |
# Multi-byte delimiters (e.g. '\r\n') may straddle two | |
# chunks in the read buffer, so we can't easily find them | |
# without collapsing the buffer. However, since protocols | |
# using delimited reads (as opposed to reads of a known | |
# length) tend to be "line" oriented, the delimiter is likely | |
# to be in the first few chunks. Merge the buffer gradually | |
# since large merges are relatively expensive and get undone in | |
# consume(). | |
if self._read_buffer: | |
while True: | |
loc = self._read_buffer[0].find(self._read_delimiter) | |
if loc != -1: | |
callback = self._read_callback | |
delimiter_len = len(self._read_delimiter) | |
self._read_callback = None | |
self._streaming_callback = None | |
self._read_delimiter = None | |
self._run_callback(callback, | |
self._consume(loc + delimiter_len)) | |
return True | |
if len(self._read_buffer) == 1: | |
break | |
_double_prefix(self._read_buffer) | |
elif self._read_regex is not None: | |
if self._read_buffer: | |
while True: | |
m = self._read_regex.search(self._read_buffer[0]) | |
if m is not None: | |
callback = self._read_callback | |
self._read_callback = None | |
self._streaming_callback = None | |
self._read_regex = None | |
self._run_callback(callback, self._consume(m.end())) | |
return True | |
if len(self._read_buffer) == 1: | |
break | |
_double_prefix(self._read_buffer) | |
return False | |
def _consume(self, loc): | |
if loc == 0: | |
return b"" | |
_merge_prefix(self._read_buffer, loc) | |
self._read_buffer_size -= loc | |
return self._read_buffer.popleft() | |
def async_wrapper(func): | |
@functools.wraps(func) | |
def wrapper(stream, *args): | |
f = tulip.Future() | |
func(stream, *args, callback=f.set_result) | |
stream.set_close_callback(f.cancel) | |
return f | |
return wrapper | |
class AsyncTulipIOStream(TulipIOStream): | |
read_until_regex = async_wrapper(TulipIOStream.read_until_regex) | |
read_until = async_wrapper(TulipIOStream.read_until) | |
read_bytes = async_wrapper(TulipIOStream.read_bytes) | |
read_until_close = async_wrapper(TulipIOStream.read_until_close) | |
def _double_prefix(deque): | |
"""Grow by doubling, but don't split the second chunk just because the | |
first one is small. | |
""" | |
new_len = max(len(deque[0]) * 2, | |
(len(deque[0]) + len(deque[1]))) | |
_merge_prefix(deque, new_len) | |
def _merge_prefix(deque, size): | |
"""Replace the first entries in a deque of strings with a single | |
string of up to size bytes. | |
>>> d = collections.deque(['abc', 'de', 'fghi', 'j']) | |
>>> _merge_prefix(d, 5); print(d) | |
deque(['abcde', 'fghi', 'j']) | |
Strings will be split as necessary to reach the desired size. | |
>>> _merge_prefix(d, 7); print(d) | |
deque(['abcdefg', 'hi', 'j']) | |
>>> _merge_prefix(d, 3); print(d) | |
deque(['abc', 'defg', 'hi', 'j']) | |
>>> _merge_prefix(d, 100); print(d) | |
deque(['abcdefghij']) | |
""" | |
if len(deque) == 1 and len(deque[0]) <= size: | |
return | |
prefix = [] | |
remaining = size | |
while deque and remaining > 0: | |
chunk = deque.popleft() | |
if len(chunk) > remaining: | |
deque.appendleft(chunk[remaining:]) | |
chunk = chunk[:remaining] | |
prefix.append(chunk) | |
remaining -= len(chunk) | |
# This data structure normally just contains byte strings, but | |
# the unittest gets messy if it doesn't use the default str() type, | |
# so do the merge based on the type of data that's actually present. | |
if prefix: | |
deque.appendleft(type(prefix[0])().join(prefix)) | |
if not deque: | |
deque.appendleft(b"") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment