Skip to content

Instantly share code, notes, and snippets.

@agronholm
Created January 15, 2011 04:50
Show Gist options
  • Save agronholm/780706 to your computer and use it in GitHub Desktop.
Save agronholm/780706 to your computer and use it in GitHub Desktop.
reactor.py
from Queue import Queue, Empty
from socket import (socket, error, AF_INET, SOCK_STREAM, SOCK_DGRAM,
SOL_SOCKET, SOL_TCP, SO_ERROR, TCP_NODELAY)
from errno import EINPROGRESS
from select import select
from threading import current_thread
from concurrent.futures import Future
from functools import partial
import sys
import os
from ioloop.stream import IOStream
class ReactorError(Exception):
pass
class SelectReactor(object):
_shutdown = False
_thread = None
def __init__(self, thread_pool=None, process_pool=None):
self.thread_pool = thread_pool
self.process_pool = process_pool
self._call_queue = Queue()
self._notify_recv_socket = socket(AF_INET, SOCK_DGRAM)
self._notify_recv_socket.bind(('127.0.0.1', 0))
self._notify_send_socket = socket(AF_INET, SOCK_DGRAM)
self._notify_send_socket.connect(self._notify_recv_socket.getsockname())
self._read_list = [self._notify_recv_socket]
self._read_map = {self._notify_recv_socket: self._process_calls}
self._write_list = []
self._write_map = {}
self._exc_set = set()
def _process_calls(self):
self._notify_recv_socket.recv(4096)
while True:
try:
func, args, kwargs, future = self._call_queue.get(False)
except Empty:
return
if future.set_running_or_notify_cancel():
try:
result = func(*args, **kwargs)
except Exception:
future.set_exception(sys.exc_info()[1])
else:
future.set_result(result)
def _add_read_socket(self, sock, callback):
assert sock not in self._read_list
self._read_list.append(sock)
self._read_map[sock] = callback
self._exc_set.add(sock)
def _add_write_socket(self, sock, callback):
assert sock not in self._write_list
self._write_list.add(sock)
self._write_map[sock] = callback
self._exc_set.add(sock)
def _remove_read_socket(self, sock):
self._read_list.remove(sock)
del self._read_map[sock]
if not sock in self._write_list:
self._exc_set.remove(sock)
def _remove_write_socket(self, sock):
self._write_list.remove(sock)
del self._write_map[sock]
if not sock in self._read_list:
self._exc_set.remove(sock)
def _handle_executor_return(self, outer, inner):
exc = inner.exception()
if exc:
self.call_in_reactor(outer.set_exception, exc)
else:
self.call_in_reactor(outer.set_result, outer.result())
@property
def started(self):
return self._thread is not None and not self._shutdown
def start(self):
if self.started:
raise ReactorError('The reactor has already been started')
self._thread = current_thread()
while not self._shutdown:
rd, wr, ex = select(self._read_list, self._write_list,
self._exc_set)
for sock in rd:
self._read_map[sock]()
for sock in wr:
self._write_map[sock]()
for sock in ex:
read_cb = self._read_map.get(sock)
write_cb = self._write_map.get(sock)
if read_cb:
read_cb()
if write_cb:
write_cb()
# Cleanup
self._notify_send_socket.close()
self._notify_recv_socket.close()
del self._read_list
del self._read_map
del self._write_list
del self._write_map
del self._exc_set
def shutdown(self):
self.call_in_reactor(setattr, self, '_shutdown', True)
def call_in_reactor(self, func, *args, **kwargs):
future = Future()
self._call_queue.put((func, args, kwargs, future))
self._notify_send_socket.send(b'\0')
if current_thread() == self._thread:
self._process_calls()
return future
def call_in_thread(self, func, *args, **kwargs):
outer = Future()
outer.set_running_or_notify_cancel()
inner = self._thread_pool.submit(func, args, kwargs)
inner.add_done_callback(partial(self._handle_executor_return, outer))
return outer
def call_in_process(self, func, *args, **kwargs):
outer = Future()
outer.set_running_or_notify_cancel()
inner = self._process_pool.submit(func, args, kwargs)
inner.add_done_callback(partial(self._handle_executor_return, outer))
return outer
def listen(self, addr, backlog, callback, family=AF_INET, type=SOCK_STREAM,
proto=0):
def _accept():
sock = serv_sock.accept()[0]
sock.setblocking(False)
sock.setsockopt(SOL_TCP, TCP_NODELAY, 1)
callback(IOStream(sock, self))
serv_sock = socket(family, type, proto)
serv_sock.setblocking(False)
serv_sock.bind(addr)
serv_sock.listen(backlog)
self._add_read_socket(serv_sock, _accept)
def connect(self, addr, family=AF_INET, type=SOCK_STREAM, proto=0):
def _connect():
self._remove_write_socket(sock)
errcode = sock.getsockopt(SOL_SOCKET, SO_ERROR)
if errcode:
future.set_exception(error(errcode, os.strerror(errcode)))
else:
future.set_result(IOStream(sock, self))
future = Future()
future.set_running_or_notify_cancel()
try:
sock = socket(family, type, proto)
sock.setblocking(False)
errcode = sock.connect_ex(addr)
if errcode != EINPROGRESS:
raise error(errcode, os.strerror(errcode))
except Exception:
future.set_exception(sys.exc_info()[1])
else:
self._add_write_socket(sock, _connect)
return future
# Determine the best default choice for the current platform
Reactor = SelectReactor
from ioloop.reactor import Reactor
from ioloop.util import inline_callbacks
@inline_callbacks
def serve(stream):
data = yield stream.read(4096)
with open('uploaded.dat', 'wb') as f:
while data:
f.write(data)
data = yield stream.read(4096)
reactor = Reactor()
reactor.listen(('127.0.0.1', 7008), 5, serve)
reactor.start()
import sys
from ioloop.reactor import Reactor
from ioloop.util import inline_callbacks
@inline_callbacks
def upload():
stream = yield reactor.connect(('127.0.0.1', 7008))
with open(sys.argv[1], 'rb') as f:
data = f.read(4096)
while data:
sent = yield stream.write(data)
print 'sent %d bytes' % sent
assert len(sys.argv) > 1
reactor = Reactor()
reactor.call_in_reactor(upload)
reactor.start()
from concurrent.futures import Future
from io import BytesIO
from socket import MSG_PEEK
import sys
class StreamException(Exception):
pass
class IOStream(object):
__slots__ = '_socket', '_reactor', '_read_future', '_write_future'
def __init__(self, socket, reactor):
self._socket = socket
self._reactor = reactor
self._read_future = None
self._write_future = None
def _read_result(self, result=None, exception=None):
self._reactor._remove_read_socket(self._socket)
future = self._read_future
del self._read_future
if exception:
future.set_exception(sys.exc_info()[1])
else:
future.set_result(result)
def _write_result(self, result=None, exception=None):
self._reactor._remove_write_socket(self._socket)
future = self._write_future
del self._write_future
if exception:
future.set_exception(sys.exc_info()[1])
else:
future.set_result(result)
def read(self, num_bytes):
def _read():
try:
data = self._socket.recv(bytes_left[0])
except Exception:
self._read_result(exception=sys.exc_info()[1])
return
buffer.write(data)
bytes_left[0] -= len(data)
if len(data) == 0 or bytes_left[0] == 0:
self._read_result(buffer.getvalue())
if self._read_future:
raise StreamException('Already reading from this socket')
bytes_left = [num_bytes]
buffer = BytesIO()
self._read_future = Future()
self._read_future.set_running_or_notify_cancel()
self._reactor._add_read_socket(self._socket, _read)
return self._read_future
def read_until(self, delimiter, read_chunk=8192, max_buffer_size=104857600):
def _read():
try:
data = self._socket.recv(read_chunk, MSG_PEEK)
except Exception:
self._read_result(exception=sys.exc_info()[1])
if len(data) == 0:
self._read_result(buffer.getvalue())
pos = buffer.getvalue().index(delimiter)
if pos >= 0:
self._socket.recv(pos + len(delimiter))
buffer.write(data[:pos + 1])
self._read_result(buffer.getvalue())
else:
self._socket.recv(read_chunk)
buffer.write(data)
if self._read_future:
raise StreamException('Already reading from this socket')
buffer = BytesIO()
self._read_future = Future()
self._read_future.set_running_or_notify_cancel()
self._reactor._add_read_socket(self._socket, _read)
return self._read_future
def write(self, data):
def _write():
try:
written = self._socket.send(data[pos[0]:])
except Exception:
self._write_result(exception=sys.exc_info()[1])
else:
pos[0] += written
if written == 0 or pos[0] == len(data):
self._write_result(pos[0])
if self._write_future:
raise StreamException('Already writing to this socket')
pos = [0]
self._write_future = Future()
self._write_future.set_running_or_notify_cancel()
self._reactor._add_write_socket(self._socket, _write)
return self._write_future
def close(self):
self._socket.close()
from concurrent.futures import Future
from functools import wraps, partial
from types import GeneratorType
import sys
__all__ = 'ReturnValue', 'inline_callbacks'
class ReturnValue(object):
def __init__(self, result):
self.result = result
def _inline_callbacks(g, future, temp_future=None):
if temp_future:
exc = temp_future.exception()
if not exc:
retval = temp_future.result()
else:
exc = retval = None
while True:
try:
if exc:
retval = g.throw(exc)
else:
retval = g.send(retval)
except StopIteration:
future.set_result(None)
return
except Exception:
future.set_exception(sys.exc_info()[1])
return
if isinstance(retval, Future):
retval.add_done_callback(partial(_inline_callbacks, g, future))
return
elif isinstance(retval, ReturnValue):
future.set_result(retval.result)
return
exc = None
def inline_callbacks(func):
@wraps(func)
def inner(*args, **kwargs):
future = Future()
future.set_running_or_notify_cancel()
g = func(*args, **kwargs)
if not isinstance(g, GeneratorType):
future.set_result(g)
else:
_inline_callbacks(g, future)
return future
return inner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment