Skip to content

Instantly share code, notes, and snippets.

@xiazhibin
Created April 1, 2018 08:20
Show Gist options
  • Save xiazhibin/9b223aa0d1775d7eb1b473a72fcfdee8 to your computer and use it in GitHub Desktop.
Save xiazhibin/9b223aa0d1775d7eb1b473a72fcfdee8 to your computer and use it in GitHub Desktop.
IO复用异步编程
def future_add_done_callback(future, callback):
if future.done():
callback(future)
else:
future.add_done_callback(callback)
from concurrent.futures import Future
from types import GeneratorType
from version2.ioloop import IOLoop
class _NullFuture(object):
def result(self):
return None
def done(self):
return True
_null_future = _NullFuture()
class Runner():
def __init__(self, gen, result_future, first_yield):
self.gen = gen
self.result_future = result_future
self.future = _null_future
self.results = None
self.running = False
self.finished = False
self.io_loop = IOLoop.instance()
if self.handle_yield(first_yield):
self.run()
def run(self):
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done():
return
self.future = None
try:
try:
value = future.result()
except Exception as e:
print(e)
else:
yielded = self.gen.send(value)
except (StopIteration,) as e:
self.finished = True
self.future = _null_future
self.result_future.set_result(e.value)
self.result_future = None
return
except Exception as e:
self.finished = True
self.future = _null_future
print(e)
return
if not self.handle_yield(yielded):
return
yielded = None
finally:
self.running = False
def handle_yield(self, yielded):
try:
self.future = convert_yielded(yielded)
except Exception as e:
print(e)
if not self.future.done():
def inner(f):
f = None # noqa
self.run()
self.io_loop.add_future(self.future, inner)
return False
return True
def is_future(x):
return isinstance(x, Future)
def convert_yielded(yielded):
if yielded is _null_future:
return _null_future
elif is_future(yielded):
return yielded
else:
raise Exception("yielded unknown object %r" % (yielded,))
def coroutine(func):
def wrapped(*args, **kwargs):
f = Future()
try:
rv = func(*args, **kwargs)
except Exception as e:
print(e)
else:
if isinstance(rv, GeneratorType):
try:
yielded = next(rv)
except StopIteration as e:
f.set_result(e.value)
else:
Runner(rv, f, yielded)
return f
f.set_result(rv)
return f
return wrapped
import collections
import selectors
import errno
import sys
import functools
from version2.concurrent import future_add_done_callback
class IOLoop(object):
@classmethod
def instance(cls):
if not hasattr(cls, '_instance'):
cls._instance = IOLoop()
return cls._instance
def __init__(self):
self.stlc = selectors.DefaultSelector()
self._handlers = {}
self._callbacks = collections.deque()
def add_handler(self, fd, events, handler):
try:
self._handlers[fd] = handler
self.stlc.register(fd, events)
except Exception as e:
print(e)
def start(self):
while True:
ncallbacks = len(self._callbacks)
for i in range(ncallbacks):
self._run_callback(self._callbacks.popleft())
if self._callbacks:
poll_timeout = 0.0
else:
poll_timeout = 0.1
event_pair = self.stlc.select(poll_timeout)
for key, events in event_pair:
try:
fd = key.fileobj
self._handlers[fd](fd, events)
except (OSError, IOError) as e:
if e.args[0] == errno.EPIPE:
pass
else:
print(e)
pass
except Exception as ee:
print(ee)
def remove_handler(self, fd):
rv = self._handlers.pop(fd, None)
if rv:
self.stlc.unregister(fd)
def update_handler(self, fd, events):
self.stlc.modify(fd, events)
def add_callback(self, callback, *args, **kwargs):
self._callbacks.append(functools.partial(callback, *args, **kwargs))
def add_future(self, future, callback):
future_add_done_callback(future, lambda future: self.add_callback(callback, future))
def _run_callback(self, callback):
"""Runs a callback with error handling.
For use in subclasses.
"""
try:
ret = callback()
if ret is not None:
try:
from version2 import gen
ret = gen.convert_yielded(ret)
except Exception as e:
print(e)
pass
else:
self.add_future(ret, self._discard_future_result)
except Exception as ee:
print(ee)
def _discard_future_result(self, future):
future.result()
import collections
import numbers
import selectors
import socket
import sys
import errno
from concurrent.futures import Future
from version2.ioloop import IOLoop
_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)
_ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE)
def _double_prefix(deque):
new_len = max(len(deque[0]) * 2,
(len(deque[0]) + len(deque[1])))
_merge_prefix(deque, new_len)
def _merge_prefix(deque, size):
if len(deque) == 1 and len(deque[0]) <= size:
return
prefix = []
remaining = size
while deque and remaining > 0:
chunk = deque.popleft()
if len(chunk) > remaining:
deque.appendleft(chunk[remaining:])
chunk = chunk[:remaining]
prefix.append(chunk)
remaining -= len(chunk)
if prefix:
deque.appendleft(type(prefix[0])().join(prefix))
if not deque:
deque.appendleft(b"")
class _StreamBuffer(object):
def __init__(self):
self._buffers = collections.deque()
self._first_pos = 0
self._size = 0
def __len__(self):
return self._size
_large_buf_threshold = 2048
def append(self, data):
size = len(data)
if size > self._large_buf_threshold:
if not isinstance(data, memoryview):
data = memoryview(data)
self._buffers.append((True, data))
elif size > 0:
if self._buffers:
is_memview, b = self._buffers[-1]
new_buf = is_memview or len(b) >= self._large_buf_threshold
else:
new_buf = True
if new_buf:
self._buffers.append((False, bytearray(data)))
else:
b += data
self._size += size
def peek(self, size):
assert size > 0
try:
is_memview, b = self._buffers[0]
except IndexError:
return memoryview(b'')
pos = self._first_pos
if is_memview:
return b[pos:pos + size]
else:
return memoryview(b)[pos:pos + size]
def advance(self, size):
assert 0 < size <= self._size
self._size -= size
pos = self._first_pos
buffers = self._buffers
while buffers and size > 0:
is_large, b = buffers[0]
b_remain = len(b) - size - pos
if b_remain <= 0:
buffers.popleft()
size -= len(b) - pos
pos = 0
elif is_large:
pos += size
size = 0
else:
# Amortized O(1) shrink for Python 2
pos += size
if len(b) <= 2 * pos:
del b[:pos]
pos = 0
size = 0
assert size == 0
self._first_pos = pos
class IOStream(object):
def __init__(self, sock):
self.max_buffer_size = 3 * 1024
self.sock = sock
self._read_buffer = collections.deque()
self._write_buffer = _StreamBuffer()
self._read_buffer_size = 0
self._read_future = None
self._write_futures = collections.deque()
self._total_write_done_index = 0
self.max_write_buffer_size = None
self._total_write_index = 0
self._read_delimiter = False
self._close_callback = None
self._closed = False
self.error = None
self._read_max_bytes = 3 * 1024
self._read_until_close = False
self.read_chunk_size = 1024
self._read_bytes = 0
self._pending_callbacks = 0
self._write_buffer_frozen = False
self._write_callback = None
self._state = None
self.io_loop = IOLoop.instance()
def _handle_events(self, fd, events):
if self.closed():
print("Got events for closed stream %d", fd)
return
try:
if events & selectors.EVENT_READ:
self._handle_read()
if self.closed():
return
if events & selectors.EVENT_WRITE:
self._handle_write()
if self.closed():
return
except Exception:
self.close(exc_info=True)
raise
def _handle_write(self):
while self._write_buffer:
size = len(self._write_buffer)
if not size:
break
assert size > 0
try:
num_bytes = self.write_to_fd(self._write_buffer.peek(size))
if num_bytes == 0:
break
self._write_buffer.advance(num_bytes)
self._total_write_done_index += num_bytes
except (socket.error, IOError, OSError) as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
break
else:
if not self._is_connreset(e):
pass
self.close(exc_info=e)
return
while self._write_futures:
index, future = self._write_futures[0]
if index > self._total_write_done_index:
break
self._write_futures.popleft()
future.set_result(None)
def close(self, exc_info=False):
if not self.closed():
if exc_info:
if not isinstance(exc_info, tuple):
exc_info = sys.exc_info()
if any(exc_info):
self.error = exc_info[1]
if self._state is not None:
self._state = None
self.io_loop.remove_handler(self.fileno())
self.close_fd()
self._closed = True
self._maybe_run_close_callback()
def close_fd(self):
self.sock.close()
self.sock = None
def _handle_read(self):
try:
pos = self._read_to_buffer_loop()
except Exception as ww:
print(ww)
raise
except Exception as e:
self.close(exc_info=e)
return
if pos is not None:
self._read_from_buffer(pos)
return
else:
self._maybe_run_close_callback()
def _maybe_run_close_callback(self):
if self.closed() and self._pending_callbacks == 0:
if self._close_callback is not None:
cb = self._close_callback
self._close_callback = None
self._run_callback(cb)
self._read_callback = self._write_callback = None
self._write_buffer = None
def _read_to_buffer_loop(self):
# This method is called from _handle_read and _try_inline_read.
try:
if self._read_bytes is not None:
target_bytes = self._read_bytes
elif self._read_max_bytes is not None:
target_bytes = self._read_max_bytes
elif self.reading():
target_bytes = None
else:
target_bytes = 0
next_find_pos = 0
self._pending_callbacks += 1
while not self.closed():
if self._read_to_buffer() == 0:
break
if (target_bytes is not None and
self._read_buffer_size >= target_bytes):
break
if self._read_buffer_size >= next_find_pos:
pos = self._find_read_pos()
if pos is not None:
return pos
next_find_pos = self._read_buffer_size * 2
return self._find_read_pos()
finally:
self._pending_callbacks -= 1
def _find_read_pos(self):
if (self._read_bytes is not None and
(self._read_buffer_size >= self._read_bytes)):
num_bytes = min(self._read_bytes, self._read_buffer_size)
return num_bytes
return None
def _check_max_bytes(self, delimiter, size):
if (self._read_max_bytes is not None and
size > self._read_max_bytes):
raise Exception(
"delimiter %r not found within %d bytes" % (
delimiter, self._read_max_bytes))
def fileno(self):
return self.sock.fileno()
def _read_from_buffer(self, pos):
self._read_bytes = self._read_delimiter = self._read_regex = None
self._run_read_callback(pos)
def _run_read_callback(self, size):
result = self._consume(size)
if self._read_future is not None:
future = self._read_future
self._read_future = None
future.set_result(result)
self._maybe_add_error_listener()
def _set_read_future(self):
f = Future()
self._read_future = f
return f
def _add_io_state(self, state):
if self.closed():
return
if self._state is None:
self._state = state
self.io_loop.add_handler(self.fileno(), self._handle_events, self._state)
elif not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self.fileno(), self._state)
def read_from_fd(self):
try:
chunk = self.sock.recv(self.read_chunk_size)
except socket.error as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
return None
else:
raise
if not chunk:
self.close()
return None
return chunk
def write_to_fd(self, data):
return self.sock.send(data)
def _read_to_buffer(self):
try:
chunk = self.read_from_fd()
except (socket.error, IOError, OSError) as e:
if e.args[0] in _ERRNO_CONNRESET:
self.close(exc_info=True)
return
self.close(exc_info=True)
raise
if chunk is None:
return 0
self._read_buffer.append(chunk)
self._read_buffer_size += len(chunk)
if self._read_buffer_size >= self.max_buffer_size:
self.close()
raise IOError("Reached maximum read buffer size")
return len(chunk)
def _try_inline_read(self):
pos = self._find_read_pos()
if pos is not None:
self._read_from_buffer(pos)
return
self._check_closed()
try:
pos = self._read_to_buffer_loop()
except Exception:
self._maybe_run_close_callback()
raise
if pos is not None:
self._read_from_buffer(pos)
return
self._maybe_add_error_listener()
def _maybe_add_error_listener(self):
if self._state is None and self._pending_callbacks == 0:
if not self.closed():
self._add_io_state(selectors.EVENT_READ)
def _check_closed(self):
if self.closed():
raise Exception("Stream is closed")
def closed(self):
return self._closed
def _run_callback(self, callback, *args, **kwargs):
try:
callback(*args, **kwargs)
except:
self.close()
raise
def _add_io_state(self, state):
if self._state is None:
self._state = selectors.EVENT_READ
IOLoop.instance().add_handler(self.fileno(), self._state, self._handle_events)
elif not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self.fileno(), self._state)
IOLoop.instance().add_handler(self.fileno(), self._state, self._handle_events)
def read_bytes(self, num_bytes):
f = self._set_read_future()
assert isinstance(num_bytes, numbers.Integral)
self._read_bytes = num_bytes
self._try_inline_read()
return f
def write(self, data):
self._check_closed()
if data:
if (self.max_write_buffer_size is not None and
len(self._write_buffer) + len(data) > self.max_write_buffer_size):
raise Exception("Reached maximum write buffer size")
self._write_buffer.append(data)
self._total_write_index += len(data)
future = Future()
future.add_done_callback(lambda f: f.exception())
self._write_futures.append((self._total_write_index, future))
self._handle_write()
if self._write_buffer:
self._add_io_state(self.io_loop.WRITE)
self._maybe_add_error_listener()
return future
def _consume(self, loc):
if loc == 0:
return b""
_merge_prefix(self._read_buffer, loc)
self._read_buffer_size -= loc
return self._read_buffer.popleft()
def set_close_callback(self, callback):
self._close_callback = callback
def reading(self):
return self._read_future is not None
def writing(self):
return bool(self._write_buffer)
def _is_connreset(self, exc):
return (isinstance(exc, (socket.error, IOError)) and
errno_from_exception(exc) in _ERRNO_CONNRESET)
def errno_from_exception(e):
if hasattr(e, 'errno'):
return e.errno # type: ignore
elif e.args:
return e.args[0]
else:
return None
import selectors
import socket
from version2 import gen
from version2.ioloop import IOLoop
from version2.iostream import IOStream
class Server(object):
def __init__(self):
self.server_sock = None
def start(self):
address = ('127.0.0.1', 5055)
self.server_sock = socket.socket()
self.server_sock.setblocking(0)
self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_sock.bind(address)
self.server_sock.listen(128)
IOLoop.instance().add_handler(self.server_sock.fileno(), selectors.EVENT_READ, self.on_accept)
def on_accept(self, fd, events):
try:
conn, address = self.server_sock.accept()
except OSError as e:
print(e)
else:
stream = IOStream(conn)
future = self.handle_stream(stream)
IOLoop.instance().add_future(gen.convert_yielded(future), lambda f: f.result())
def handle_stream(self, stream):
raise NotImplementedError
def close(self):
if self.server_sock:
self.server_sock.close()
self.server_sock = None
IOLoop.instance().remove_handler(self.server_sock)
class EchoServer(Server):
@gen.coroutine
def handle_stream(self, stream):
stream.set_close_callback(self.on_close)
while True:
data = yield stream.read_bytes(4)
len_of_data = int.from_bytes(data, byteorder='big')
body = yield stream.read_bytes(len_of_data)
yield stream.write(body)
def on_close(self):
print('closed')
server = EchoServer()
server.start()
IOLoop.instance().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment