Created
April 1, 2018 08:20
-
-
Save xiazhibin/9b223aa0d1775d7eb1b473a72fcfdee8 to your computer and use it in GitHub Desktop.
IO复用异步编程
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def future_add_done_callback(future, callback): | |
if future.done(): | |
callback(future) | |
else: | |
future.add_done_callback(callback) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import Future | |
from types import GeneratorType | |
from version2.ioloop import IOLoop | |
class _NullFuture(object): | |
def result(self): | |
return None | |
def done(self): | |
return True | |
_null_future = _NullFuture() | |
class Runner(): | |
def __init__(self, gen, result_future, first_yield): | |
self.gen = gen | |
self.result_future = result_future | |
self.future = _null_future | |
self.results = None | |
self.running = False | |
self.finished = False | |
self.io_loop = IOLoop.instance() | |
if self.handle_yield(first_yield): | |
self.run() | |
def run(self): | |
if self.running or self.finished: | |
return | |
try: | |
self.running = True | |
while True: | |
future = self.future | |
if not future.done(): | |
return | |
self.future = None | |
try: | |
try: | |
value = future.result() | |
except Exception as e: | |
print(e) | |
else: | |
yielded = self.gen.send(value) | |
except (StopIteration,) as e: | |
self.finished = True | |
self.future = _null_future | |
self.result_future.set_result(e.value) | |
self.result_future = None | |
return | |
except Exception as e: | |
self.finished = True | |
self.future = _null_future | |
print(e) | |
return | |
if not self.handle_yield(yielded): | |
return | |
yielded = None | |
finally: | |
self.running = False | |
def handle_yield(self, yielded): | |
try: | |
self.future = convert_yielded(yielded) | |
except Exception as e: | |
print(e) | |
if not self.future.done(): | |
def inner(f): | |
f = None # noqa | |
self.run() | |
self.io_loop.add_future(self.future, inner) | |
return False | |
return True | |
def is_future(x): | |
return isinstance(x, Future) | |
def convert_yielded(yielded): | |
if yielded is _null_future: | |
return _null_future | |
elif is_future(yielded): | |
return yielded | |
else: | |
raise Exception("yielded unknown object %r" % (yielded,)) | |
def coroutine(func): | |
def wrapped(*args, **kwargs): | |
f = Future() | |
try: | |
rv = func(*args, **kwargs) | |
except Exception as e: | |
print(e) | |
else: | |
if isinstance(rv, GeneratorType): | |
try: | |
yielded = next(rv) | |
except StopIteration as e: | |
f.set_result(e.value) | |
else: | |
Runner(rv, f, yielded) | |
return f | |
f.set_result(rv) | |
return f | |
return wrapped |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import selectors | |
import errno | |
import sys | |
import functools | |
from version2.concurrent import future_add_done_callback | |
class IOLoop(object): | |
@classmethod | |
def instance(cls): | |
if not hasattr(cls, '_instance'): | |
cls._instance = IOLoop() | |
return cls._instance | |
def __init__(self): | |
self.stlc = selectors.DefaultSelector() | |
self._handlers = {} | |
self._callbacks = collections.deque() | |
def add_handler(self, fd, events, handler): | |
try: | |
self._handlers[fd] = handler | |
self.stlc.register(fd, events) | |
except Exception as e: | |
print(e) | |
def start(self): | |
while True: | |
ncallbacks = len(self._callbacks) | |
for i in range(ncallbacks): | |
self._run_callback(self._callbacks.popleft()) | |
if self._callbacks: | |
poll_timeout = 0.0 | |
else: | |
poll_timeout = 0.1 | |
event_pair = self.stlc.select(poll_timeout) | |
for key, events in event_pair: | |
try: | |
fd = key.fileobj | |
self._handlers[fd](fd, events) | |
except (OSError, IOError) as e: | |
if e.args[0] == errno.EPIPE: | |
pass | |
else: | |
print(e) | |
pass | |
except Exception as ee: | |
print(ee) | |
def remove_handler(self, fd): | |
rv = self._handlers.pop(fd, None) | |
if rv: | |
self.stlc.unregister(fd) | |
def update_handler(self, fd, events): | |
self.stlc.modify(fd, events) | |
def add_callback(self, callback, *args, **kwargs): | |
self._callbacks.append(functools.partial(callback, *args, **kwargs)) | |
def add_future(self, future, callback): | |
future_add_done_callback(future, lambda future: self.add_callback(callback, future)) | |
def _run_callback(self, callback): | |
"""Runs a callback with error handling. | |
For use in subclasses. | |
""" | |
try: | |
ret = callback() | |
if ret is not None: | |
try: | |
from version2 import gen | |
ret = gen.convert_yielded(ret) | |
except Exception as e: | |
print(e) | |
pass | |
else: | |
self.add_future(ret, self._discard_future_result) | |
except Exception as ee: | |
print(ee) | |
def _discard_future_result(self, future): | |
future.result() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import numbers | |
import selectors | |
import socket | |
import sys | |
import errno | |
from concurrent.futures import Future | |
from version2.ioloop import IOLoop | |
_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN) | |
_ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE) | |
def _double_prefix(deque): | |
new_len = max(len(deque[0]) * 2, | |
(len(deque[0]) + len(deque[1]))) | |
_merge_prefix(deque, new_len) | |
def _merge_prefix(deque, size): | |
if len(deque) == 1 and len(deque[0]) <= size: | |
return | |
prefix = [] | |
remaining = size | |
while deque and remaining > 0: | |
chunk = deque.popleft() | |
if len(chunk) > remaining: | |
deque.appendleft(chunk[remaining:]) | |
chunk = chunk[:remaining] | |
prefix.append(chunk) | |
remaining -= len(chunk) | |
if prefix: | |
deque.appendleft(type(prefix[0])().join(prefix)) | |
if not deque: | |
deque.appendleft(b"") | |
class _StreamBuffer(object): | |
def __init__(self): | |
self._buffers = collections.deque() | |
self._first_pos = 0 | |
self._size = 0 | |
def __len__(self): | |
return self._size | |
_large_buf_threshold = 2048 | |
def append(self, data): | |
size = len(data) | |
if size > self._large_buf_threshold: | |
if not isinstance(data, memoryview): | |
data = memoryview(data) | |
self._buffers.append((True, data)) | |
elif size > 0: | |
if self._buffers: | |
is_memview, b = self._buffers[-1] | |
new_buf = is_memview or len(b) >= self._large_buf_threshold | |
else: | |
new_buf = True | |
if new_buf: | |
self._buffers.append((False, bytearray(data))) | |
else: | |
b += data | |
self._size += size | |
def peek(self, size): | |
assert size > 0 | |
try: | |
is_memview, b = self._buffers[0] | |
except IndexError: | |
return memoryview(b'') | |
pos = self._first_pos | |
if is_memview: | |
return b[pos:pos + size] | |
else: | |
return memoryview(b)[pos:pos + size] | |
def advance(self, size): | |
assert 0 < size <= self._size | |
self._size -= size | |
pos = self._first_pos | |
buffers = self._buffers | |
while buffers and size > 0: | |
is_large, b = buffers[0] | |
b_remain = len(b) - size - pos | |
if b_remain <= 0: | |
buffers.popleft() | |
size -= len(b) - pos | |
pos = 0 | |
elif is_large: | |
pos += size | |
size = 0 | |
else: | |
# Amortized O(1) shrink for Python 2 | |
pos += size | |
if len(b) <= 2 * pos: | |
del b[:pos] | |
pos = 0 | |
size = 0 | |
assert size == 0 | |
self._first_pos = pos | |
class IOStream(object): | |
def __init__(self, sock): | |
self.max_buffer_size = 3 * 1024 | |
self.sock = sock | |
self._read_buffer = collections.deque() | |
self._write_buffer = _StreamBuffer() | |
self._read_buffer_size = 0 | |
self._read_future = None | |
self._write_futures = collections.deque() | |
self._total_write_done_index = 0 | |
self.max_write_buffer_size = None | |
self._total_write_index = 0 | |
self._read_delimiter = False | |
self._close_callback = None | |
self._closed = False | |
self.error = None | |
self._read_max_bytes = 3 * 1024 | |
self._read_until_close = False | |
self.read_chunk_size = 1024 | |
self._read_bytes = 0 | |
self._pending_callbacks = 0 | |
self._write_buffer_frozen = False | |
self._write_callback = None | |
self._state = None | |
self.io_loop = IOLoop.instance() | |
def _handle_events(self, fd, events): | |
if self.closed(): | |
print("Got events for closed stream %d", fd) | |
return | |
try: | |
if events & selectors.EVENT_READ: | |
self._handle_read() | |
if self.closed(): | |
return | |
if events & selectors.EVENT_WRITE: | |
self._handle_write() | |
if self.closed(): | |
return | |
except Exception: | |
self.close(exc_info=True) | |
raise | |
def _handle_write(self): | |
while self._write_buffer: | |
size = len(self._write_buffer) | |
if not size: | |
break | |
assert size > 0 | |
try: | |
num_bytes = self.write_to_fd(self._write_buffer.peek(size)) | |
if num_bytes == 0: | |
break | |
self._write_buffer.advance(num_bytes) | |
self._total_write_done_index += num_bytes | |
except (socket.error, IOError, OSError) as e: | |
if e.args[0] in _ERRNO_WOULDBLOCK: | |
break | |
else: | |
if not self._is_connreset(e): | |
pass | |
self.close(exc_info=e) | |
return | |
while self._write_futures: | |
index, future = self._write_futures[0] | |
if index > self._total_write_done_index: | |
break | |
self._write_futures.popleft() | |
future.set_result(None) | |
def close(self, exc_info=False): | |
if not self.closed(): | |
if exc_info: | |
if not isinstance(exc_info, tuple): | |
exc_info = sys.exc_info() | |
if any(exc_info): | |
self.error = exc_info[1] | |
if self._state is not None: | |
self._state = None | |
self.io_loop.remove_handler(self.fileno()) | |
self.close_fd() | |
self._closed = True | |
self._maybe_run_close_callback() | |
def close_fd(self): | |
self.sock.close() | |
self.sock = None | |
def _handle_read(self): | |
try: | |
pos = self._read_to_buffer_loop() | |
except Exception as ww: | |
print(ww) | |
raise | |
except Exception as e: | |
self.close(exc_info=e) | |
return | |
if pos is not None: | |
self._read_from_buffer(pos) | |
return | |
else: | |
self._maybe_run_close_callback() | |
def _maybe_run_close_callback(self): | |
if self.closed() and self._pending_callbacks == 0: | |
if self._close_callback is not None: | |
cb = self._close_callback | |
self._close_callback = None | |
self._run_callback(cb) | |
self._read_callback = self._write_callback = None | |
self._write_buffer = None | |
def _read_to_buffer_loop(self): | |
# This method is called from _handle_read and _try_inline_read. | |
try: | |
if self._read_bytes is not None: | |
target_bytes = self._read_bytes | |
elif self._read_max_bytes is not None: | |
target_bytes = self._read_max_bytes | |
elif self.reading(): | |
target_bytes = None | |
else: | |
target_bytes = 0 | |
next_find_pos = 0 | |
self._pending_callbacks += 1 | |
while not self.closed(): | |
if self._read_to_buffer() == 0: | |
break | |
if (target_bytes is not None and | |
self._read_buffer_size >= target_bytes): | |
break | |
if self._read_buffer_size >= next_find_pos: | |
pos = self._find_read_pos() | |
if pos is not None: | |
return pos | |
next_find_pos = self._read_buffer_size * 2 | |
return self._find_read_pos() | |
finally: | |
self._pending_callbacks -= 1 | |
def _find_read_pos(self): | |
if (self._read_bytes is not None and | |
(self._read_buffer_size >= self._read_bytes)): | |
num_bytes = min(self._read_bytes, self._read_buffer_size) | |
return num_bytes | |
return None | |
def _check_max_bytes(self, delimiter, size): | |
if (self._read_max_bytes is not None and | |
size > self._read_max_bytes): | |
raise Exception( | |
"delimiter %r not found within %d bytes" % ( | |
delimiter, self._read_max_bytes)) | |
def fileno(self): | |
return self.sock.fileno() | |
def _read_from_buffer(self, pos): | |
self._read_bytes = self._read_delimiter = self._read_regex = None | |
self._run_read_callback(pos) | |
def _run_read_callback(self, size): | |
result = self._consume(size) | |
if self._read_future is not None: | |
future = self._read_future | |
self._read_future = None | |
future.set_result(result) | |
self._maybe_add_error_listener() | |
def _set_read_future(self): | |
f = Future() | |
self._read_future = f | |
return f | |
def _add_io_state(self, state): | |
if self.closed(): | |
return | |
if self._state is None: | |
self._state = state | |
self.io_loop.add_handler(self.fileno(), self._handle_events, self._state) | |
elif not self._state & state: | |
self._state = self._state | state | |
self.io_loop.update_handler(self.fileno(), self._state) | |
def read_from_fd(self): | |
try: | |
chunk = self.sock.recv(self.read_chunk_size) | |
except socket.error as e: | |
if e.args[0] in _ERRNO_WOULDBLOCK: | |
return None | |
else: | |
raise | |
if not chunk: | |
self.close() | |
return None | |
return chunk | |
def write_to_fd(self, data): | |
return self.sock.send(data) | |
def _read_to_buffer(self): | |
try: | |
chunk = self.read_from_fd() | |
except (socket.error, IOError, OSError) as e: | |
if e.args[0] in _ERRNO_CONNRESET: | |
self.close(exc_info=True) | |
return | |
self.close(exc_info=True) | |
raise | |
if chunk is None: | |
return 0 | |
self._read_buffer.append(chunk) | |
self._read_buffer_size += len(chunk) | |
if self._read_buffer_size >= self.max_buffer_size: | |
self.close() | |
raise IOError("Reached maximum read buffer size") | |
return len(chunk) | |
def _try_inline_read(self): | |
pos = self._find_read_pos() | |
if pos is not None: | |
self._read_from_buffer(pos) | |
return | |
self._check_closed() | |
try: | |
pos = self._read_to_buffer_loop() | |
except Exception: | |
self._maybe_run_close_callback() | |
raise | |
if pos is not None: | |
self._read_from_buffer(pos) | |
return | |
self._maybe_add_error_listener() | |
def _maybe_add_error_listener(self): | |
if self._state is None and self._pending_callbacks == 0: | |
if not self.closed(): | |
self._add_io_state(selectors.EVENT_READ) | |
def _check_closed(self): | |
if self.closed(): | |
raise Exception("Stream is closed") | |
def closed(self): | |
return self._closed | |
def _run_callback(self, callback, *args, **kwargs): | |
try: | |
callback(*args, **kwargs) | |
except: | |
self.close() | |
raise | |
def _add_io_state(self, state): | |
if self._state is None: | |
self._state = selectors.EVENT_READ | |
IOLoop.instance().add_handler(self.fileno(), self._state, self._handle_events) | |
elif not self._state & state: | |
self._state = self._state | state | |
self.io_loop.update_handler(self.fileno(), self._state) | |
IOLoop.instance().add_handler(self.fileno(), self._state, self._handle_events) | |
def read_bytes(self, num_bytes): | |
f = self._set_read_future() | |
assert isinstance(num_bytes, numbers.Integral) | |
self._read_bytes = num_bytes | |
self._try_inline_read() | |
return f | |
def write(self, data): | |
self._check_closed() | |
if data: | |
if (self.max_write_buffer_size is not None and | |
len(self._write_buffer) + len(data) > self.max_write_buffer_size): | |
raise Exception("Reached maximum write buffer size") | |
self._write_buffer.append(data) | |
self._total_write_index += len(data) | |
future = Future() | |
future.add_done_callback(lambda f: f.exception()) | |
self._write_futures.append((self._total_write_index, future)) | |
self._handle_write() | |
if self._write_buffer: | |
self._add_io_state(self.io_loop.WRITE) | |
self._maybe_add_error_listener() | |
return future | |
def _consume(self, loc): | |
if loc == 0: | |
return b"" | |
_merge_prefix(self._read_buffer, loc) | |
self._read_buffer_size -= loc | |
return self._read_buffer.popleft() | |
def set_close_callback(self, callback): | |
self._close_callback = callback | |
def reading(self): | |
return self._read_future is not None | |
def writing(self): | |
return bool(self._write_buffer) | |
def _is_connreset(self, exc): | |
return (isinstance(exc, (socket.error, IOError)) and | |
errno_from_exception(exc) in _ERRNO_CONNRESET) | |
def errno_from_exception(e): | |
if hasattr(e, 'errno'): | |
return e.errno # type: ignore | |
elif e.args: | |
return e.args[0] | |
else: | |
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import selectors | |
import socket | |
from version2 import gen | |
from version2.ioloop import IOLoop | |
from version2.iostream import IOStream | |
class Server(object): | |
def __init__(self): | |
self.server_sock = None | |
def start(self): | |
address = ('127.0.0.1', 5055) | |
self.server_sock = socket.socket() | |
self.server_sock.setblocking(0) | |
self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
self.server_sock.bind(address) | |
self.server_sock.listen(128) | |
IOLoop.instance().add_handler(self.server_sock.fileno(), selectors.EVENT_READ, self.on_accept) | |
def on_accept(self, fd, events): | |
try: | |
conn, address = self.server_sock.accept() | |
except OSError as e: | |
print(e) | |
else: | |
stream = IOStream(conn) | |
future = self.handle_stream(stream) | |
IOLoop.instance().add_future(gen.convert_yielded(future), lambda f: f.result()) | |
def handle_stream(self, stream): | |
raise NotImplementedError | |
def close(self): | |
if self.server_sock: | |
self.server_sock.close() | |
self.server_sock = None | |
IOLoop.instance().remove_handler(self.server_sock) | |
class EchoServer(Server): | |
@gen.coroutine | |
def handle_stream(self, stream): | |
stream.set_close_callback(self.on_close) | |
while True: | |
data = yield stream.read_bytes(4) | |
len_of_data = int.from_bytes(data, byteorder='big') | |
body = yield stream.read_bytes(len_of_data) | |
yield stream.write(body) | |
def on_close(self): | |
print('closed') | |
server = EchoServer() | |
server.start() | |
IOLoop.instance().start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment