Created
January 15, 2011 04:50
-
-
Save agronholm/780706 to your computer and use it in GitHub Desktop.
reactor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from Queue import Queue, Empty | |
from socket import (socket, error, AF_INET, SOCK_STREAM, SOCK_DGRAM, | |
SOL_SOCKET, SOL_TCP, SO_ERROR, TCP_NODELAY) | |
from errno import EINPROGRESS | |
from select import select | |
from threading import current_thread | |
from concurrent.futures import Future | |
from functools import partial | |
import sys | |
import os | |
from ioloop.stream import IOStream | |
class ReactorError(Exception): | |
pass | |
class SelectReactor(object): | |
_shutdown = False | |
_thread = None | |
def __init__(self, thread_pool=None, process_pool=None): | |
self.thread_pool = thread_pool | |
self.process_pool = process_pool | |
self._call_queue = Queue() | |
self._notify_recv_socket = socket(AF_INET, SOCK_DGRAM) | |
self._notify_recv_socket.bind(('127.0.0.1', 0)) | |
self._notify_send_socket = socket(AF_INET, SOCK_DGRAM) | |
self._notify_send_socket.connect(self._notify_recv_socket.getsockname()) | |
self._read_list = [self._notify_recv_socket] | |
self._read_map = {self._notify_recv_socket: self._process_calls} | |
self._write_list = [] | |
self._write_map = {} | |
self._exc_set = set() | |
def _process_calls(self): | |
self._notify_recv_socket.recv(4096) | |
while True: | |
try: | |
func, args, kwargs, future = self._call_queue.get(False) | |
except Empty: | |
return | |
if future.set_running_or_notify_cancel(): | |
try: | |
result = func(*args, **kwargs) | |
except Exception: | |
future.set_exception(sys.exc_info()[1]) | |
else: | |
future.set_result(result) | |
def _add_read_socket(self, sock, callback): | |
assert sock not in self._read_list | |
self._read_list.append(sock) | |
self._read_map[sock] = callback | |
self._exc_set.add(sock) | |
def _add_write_socket(self, sock, callback): | |
assert sock not in self._write_list | |
self._write_list.add(sock) | |
self._write_map[sock] = callback | |
self._exc_set.add(sock) | |
def _remove_read_socket(self, sock): | |
self._read_list.remove(sock) | |
del self._read_map[sock] | |
if not sock in self._write_list: | |
self._exc_set.remove(sock) | |
def _remove_write_socket(self, sock): | |
self._write_list.remove(sock) | |
del self._write_map[sock] | |
if not sock in self._read_list: | |
self._exc_set.remove(sock) | |
def _handle_executor_return(self, outer, inner): | |
exc = inner.exception() | |
if exc: | |
self.call_in_reactor(outer.set_exception, exc) | |
else: | |
self.call_in_reactor(outer.set_result, outer.result()) | |
@property | |
def started(self): | |
return self._thread is not None and not self._shutdown | |
def start(self): | |
if self.started: | |
raise ReactorError('The reactor has already been started') | |
self._thread = current_thread() | |
while not self._shutdown: | |
rd, wr, ex = select(self._read_list, self._write_list, | |
self._exc_set) | |
for sock in rd: | |
self._read_map[sock]() | |
for sock in wr: | |
self._write_map[sock]() | |
for sock in ex: | |
read_cb = self._read_map.get(sock) | |
write_cb = self._write_map.get(sock) | |
if read_cb: | |
read_cb() | |
if write_cb: | |
write_cb() | |
# Cleanup | |
self._notify_send_socket.close() | |
self._notify_recv_socket.close() | |
del self._read_list | |
del self._read_map | |
del self._write_list | |
del self._write_map | |
del self._exc_set | |
def shutdown(self): | |
self.call_in_reactor(setattr, self, '_shutdown', True) | |
def call_in_reactor(self, func, *args, **kwargs): | |
future = Future() | |
self._call_queue.put((func, args, kwargs, future)) | |
self._notify_send_socket.send(b'\0') | |
if current_thread() == self._thread: | |
self._process_calls() | |
return future | |
def call_in_thread(self, func, *args, **kwargs): | |
outer = Future() | |
outer.set_running_or_notify_cancel() | |
inner = self._thread_pool.submit(func, args, kwargs) | |
inner.add_done_callback(partial(self._handle_executor_return, outer)) | |
return outer | |
def call_in_process(self, func, *args, **kwargs): | |
outer = Future() | |
outer.set_running_or_notify_cancel() | |
inner = self._process_pool.submit(func, args, kwargs) | |
inner.add_done_callback(partial(self._handle_executor_return, outer)) | |
return outer | |
def listen(self, addr, backlog, callback, family=AF_INET, type=SOCK_STREAM, | |
proto=0): | |
def _accept(): | |
sock = serv_sock.accept()[0] | |
sock.setblocking(False) | |
sock.setsockopt(SOL_TCP, TCP_NODELAY, 1) | |
callback(IOStream(sock, self)) | |
serv_sock = socket(family, type, proto) | |
serv_sock.setblocking(False) | |
serv_sock.bind(addr) | |
serv_sock.listen(backlog) | |
self._add_read_socket(serv_sock, _accept) | |
def connect(self, addr, family=AF_INET, type=SOCK_STREAM, proto=0): | |
def _connect(): | |
self._remove_write_socket(sock) | |
errcode = sock.getsockopt(SOL_SOCKET, SO_ERROR) | |
if errcode: | |
future.set_exception(error(errcode, os.strerror(errcode))) | |
else: | |
future.set_result(IOStream(sock, self)) | |
future = Future() | |
future.set_running_or_notify_cancel() | |
try: | |
sock = socket(family, type, proto) | |
sock.setblocking(False) | |
errcode = sock.connect_ex(addr) | |
if errcode != EINPROGRESS: | |
raise error(errcode, os.strerror(errcode)) | |
except Exception: | |
future.set_exception(sys.exc_info()[1]) | |
else: | |
self._add_write_socket(sock, _connect) | |
return future | |
# Determine the best default choice for the current platform | |
Reactor = SelectReactor |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ioloop.reactor import Reactor | |
from ioloop.util import inline_callbacks | |
@inline_callbacks | |
def serve(stream): | |
data = yield stream.read(4096) | |
with open('uploaded.dat', 'wb') as f: | |
while data: | |
f.write(data) | |
data = yield stream.read(4096) | |
reactor = Reactor() | |
reactor.listen(('127.0.0.1', 7008), 5, serve) | |
reactor.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from ioloop.reactor import Reactor | |
from ioloop.util import inline_callbacks | |
@inline_callbacks | |
def upload(): | |
stream = yield reactor.connect(('127.0.0.1', 7008)) | |
with open(sys.argv[1], 'rb') as f: | |
data = f.read(4096) | |
while data: | |
sent = yield stream.write(data) | |
print 'sent %d bytes' % sent | |
assert len(sys.argv) > 1 | |
reactor = Reactor() | |
reactor.call_in_reactor(upload) | |
reactor.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import Future | |
from io import BytesIO | |
from socket import MSG_PEEK | |
import sys | |
class StreamException(Exception): | |
pass | |
class IOStream(object): | |
__slots__ = '_socket', '_reactor', '_read_future', '_write_future' | |
def __init__(self, socket, reactor): | |
self._socket = socket | |
self._reactor = reactor | |
self._read_future = None | |
self._write_future = None | |
def _read_result(self, result=None, exception=None): | |
self._reactor._remove_read_socket(self._socket) | |
future = self._read_future | |
del self._read_future | |
if exception: | |
future.set_exception(sys.exc_info()[1]) | |
else: | |
future.set_result(result) | |
def _write_result(self, result=None, exception=None): | |
self._reactor._remove_write_socket(self._socket) | |
future = self._write_future | |
del self._write_future | |
if exception: | |
future.set_exception(sys.exc_info()[1]) | |
else: | |
future.set_result(result) | |
def read(self, num_bytes): | |
def _read(): | |
try: | |
data = self._socket.recv(bytes_left[0]) | |
except Exception: | |
self._read_result(exception=sys.exc_info()[1]) | |
return | |
buffer.write(data) | |
bytes_left[0] -= len(data) | |
if len(data) == 0 or bytes_left[0] == 0: | |
self._read_result(buffer.getvalue()) | |
if self._read_future: | |
raise StreamException('Already reading from this socket') | |
bytes_left = [num_bytes] | |
buffer = BytesIO() | |
self._read_future = Future() | |
self._read_future.set_running_or_notify_cancel() | |
self._reactor._add_read_socket(self._socket, _read) | |
return self._read_future | |
def read_until(self, delimiter, read_chunk=8192, max_buffer_size=104857600): | |
def _read(): | |
try: | |
data = self._socket.recv(read_chunk, MSG_PEEK) | |
except Exception: | |
self._read_result(exception=sys.exc_info()[1]) | |
if len(data) == 0: | |
self._read_result(buffer.getvalue()) | |
pos = buffer.getvalue().index(delimiter) | |
if pos >= 0: | |
self._socket.recv(pos + len(delimiter)) | |
buffer.write(data[:pos + 1]) | |
self._read_result(buffer.getvalue()) | |
else: | |
self._socket.recv(read_chunk) | |
buffer.write(data) | |
if self._read_future: | |
raise StreamException('Already reading from this socket') | |
buffer = BytesIO() | |
self._read_future = Future() | |
self._read_future.set_running_or_notify_cancel() | |
self._reactor._add_read_socket(self._socket, _read) | |
return self._read_future | |
def write(self, data): | |
def _write(): | |
try: | |
written = self._socket.send(data[pos[0]:]) | |
except Exception: | |
self._write_result(exception=sys.exc_info()[1]) | |
else: | |
pos[0] += written | |
if written == 0 or pos[0] == len(data): | |
self._write_result(pos[0]) | |
if self._write_future: | |
raise StreamException('Already writing to this socket') | |
pos = [0] | |
self._write_future = Future() | |
self._write_future.set_running_or_notify_cancel() | |
self._reactor._add_write_socket(self._socket, _write) | |
return self._write_future | |
def close(self): | |
self._socket.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import Future | |
from functools import wraps, partial | |
from types import GeneratorType | |
import sys | |
__all__ = 'ReturnValue', 'inline_callbacks' | |
class ReturnValue(object): | |
def __init__(self, result): | |
self.result = result | |
def _inline_callbacks(g, future, temp_future=None): | |
if temp_future: | |
exc = temp_future.exception() | |
if not exc: | |
retval = temp_future.result() | |
else: | |
exc = retval = None | |
while True: | |
try: | |
if exc: | |
retval = g.throw(exc) | |
else: | |
retval = g.send(retval) | |
except StopIteration: | |
future.set_result(None) | |
return | |
except Exception: | |
future.set_exception(sys.exc_info()[1]) | |
return | |
if isinstance(retval, Future): | |
retval.add_done_callback(partial(_inline_callbacks, g, future)) | |
return | |
elif isinstance(retval, ReturnValue): | |
future.set_result(retval.result) | |
return | |
exc = None | |
def inline_callbacks(func): | |
@wraps(func) | |
def inner(*args, **kwargs): | |
future = Future() | |
future.set_running_or_notify_cancel() | |
g = func(*args, **kwargs) | |
if not isinstance(g, GeneratorType): | |
future.set_result(g) | |
else: | |
_inline_callbacks(g, future) | |
return future | |
return inner |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment