Created
February 10, 2010 16:52
-
-
Save weaver/300540 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys, socket, select, errno | |
from tornado import ioloop, iostream | |
### Main Program | |
def main(kind=None): | |
if not kind or kind not in ('Original', 'Modified'): | |
usage() | |
debug('start: READ=%d WRITE=%d READ/WRITE=%d' % ( | |
IOLoop.READ | IOLoop.ERROR, | |
IOLoop.WRITE | IOLoop.ERROR, | |
IOLoop.READ | IOLoop.WRITE | IOLoop.ERROR | |
)) | |
io = IOLoop(globals()[kind]() if hasattr(select, 'kqueue') else None) | |
S = tcp_server(server, bind('127.0.0.1', 9000), io) | |
C = tcp_client(client, connect('127.0.0.1', 9000), io) | |
io.start() | |
def usage(): | |
print 'usage: %s Original|Modified' % sys.argv[0] | |
sys.exit(1) | |
### Application | |
def server(stream): | |
fd = stream.socket.fileno() | |
def handle_line(line): | |
debug('server: got line %r (fd=%d)' % (line, fd)) | |
if line == 'QUIT': | |
write('GOODBYE') | |
elif line == 'RESET': | |
write('PROCEED') | |
else: | |
write('ECHO %r' % line) | |
def write(data): | |
debug('server: writing %r (fd=%d)' % (data, fd)) | |
stream.write('%s\n' % data) | |
wait() | |
def wait(): | |
stream.read_until('\n', lambda d: handle_line(d.strip())) | |
debug('server: begin (fd=%d)' % fd) | |
wait() | |
def client(stream): | |
fd = stream.socket.fileno() | |
def handle_line(line): | |
debug('client: got line %r (fd=%d)' % (line, fd)) | |
if line == "ECHO 'hello'": | |
write('RESET') | |
elif line == 'PROCEED': | |
write('QUIT') | |
elif line == 'GOODBYE': | |
stream.close() | |
else: | |
wait() | |
def write(data, callback=None): | |
debug('client: writing %r (fd=%d)' % (data, fd)) | |
stream.write('%s\n' % data, callback) | |
wait() | |
def wait(): | |
stream.read_until('\n', lambda d: handle_line(d.strip())) | |
debug('client: begin (fd=%d)' % fd) | |
write('hello') | |
### TCP | |
def tcp_server(handle, sock, io): | |
def accept(fd, events): | |
while True: | |
try: | |
conn, addr = sock.accept() | |
debug('tcp_server: accepted (fd=%d)' % conn.fileno()) | |
handle(IOStream(conn, io)) | |
except socket.error as exc: | |
if exc[0] not in (errno.EWOULDBLOCK, errno.EAGAIN): | |
raise | |
return | |
debug('tcp_server: (fd=%d)' % sock.fileno()) | |
io.set_handler(sock.fileno(), accept, io.READ) | |
return sock | |
def bind(addr, port): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.setblocking(0) | |
sock.bind((addr, int(port))) | |
sock.listen(128) | |
return sock | |
def tcp_client(handle, sock, io): | |
debug('tcp_client: fd=%d' % sock.fileno()) | |
handle(IOStream(sock, io)) | |
return sock | |
def connect(addr, port): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) | |
sock.setblocking(0) | |
try: | |
sock.connect((addr, int(port))) | |
except socket.error as exc: | |
if exc[0] != errno.EINPROGRESS: | |
raise | |
return sock | |
### IO | |
def debug(message): | |
print >> sys.stderr, message | |
class IOLoop(ioloop.IOLoop): | |
"""Extend the add_handler() method to become set_handler().""" | |
def add_handler(self, fd, handler, events): | |
return self.set_handler(fd, handler, events) | |
def set_handler(self, fd, handler, events): | |
"""Registers the given handler to receive the given events for | |
fd. If a handler is already registered, it is replaced.""" | |
if fd in self._handlers: | |
self._handlers[fd] = handler | |
self._impl.modify(fd, events | self.ERROR) | |
else: | |
self._handlers[fd] = handler | |
self._impl.register(fd, events | self.ERROR) | |
class IOStream(iostream.IOStream): | |
"""Add a debug statement to _handle_events. Otherwise this acts | |
the same as Tornado's IOStream.""" | |
def _handle_events(self, fd, events): | |
debug('IOStream: handle fd=%d events=%d want-write=%d want-read=%d' % ( | |
fd, | |
events, | |
bool(self._write_buffer), | |
bool(self._read_bytes or self._read_delimiter) | |
)) | |
return super(IOStream, self)._handle_events(fd, events) | |
class Original(ioloop._KQueue): | |
def register(self, fd, events): | |
debug('register: fd=%d events=%d filter=%d' % (fd, events, kq_filter(events))) | |
super(Original, self).register(fd, events) | |
def modify(self, fd, events): | |
debug('modify: fd=%d events=%d filter=%d' % (fd, events, kq_filter(events))) | |
super(Original, self).modify(fd, events) | |
def unregister(self, fd): | |
debug('unregister: fd=%d' % fd) | |
super(Original, self).unregister(fd) | |
def poll(self, timeout): | |
## This method was just copied from ioloop without changes | |
## except to add debugging statements. | |
kevents = self._kqueue.control(None, 1000, timeout) | |
events = [] | |
debug('\npoll: kevents=%r' % [(e.ident, e.filter) for e in kevents]) | |
for kevent in kevents: | |
fd = kevent.ident | |
flags = 0 | |
if kevent.filter & select.KQ_FILTER_READ: | |
flags |= IOLoop.READ | |
if kevent.filter & select.KQ_FILTER_WRITE: | |
flags |= IOLoop.WRITE | |
if kevent.flags & select.KQ_EV_ERROR: | |
flags |= IOLoop.ERROR | |
events.append((fd, flags)) | |
debug('poll: events=%r' % events) | |
return events | |
def kq_filter(events): | |
state = 0 | |
if events & IOLoop.WRITE: | |
state |= select.KQ_FILTER_WRITE | |
if events & IOLoop.READ or state == 0: | |
state |= select.KQ_FILTER_READ | |
return state | |
class Modified(object): | |
## shortcuts | |
if hasattr(select, 'kqueue'): | |
READ = select.KQ_FILTER_READ | |
WRITE = select.KQ_FILTER_WRITE | |
DELETE = select.KQ_EV_DELETE | |
ADD = select.KQ_EV_ADD | |
ENABLE = select.KQ_EV_ENABLE | |
DISABLE = select.KQ_EV_DISABLE | |
def __init__(self): | |
self._kq = select.kqueue() | |
self._active = {} | |
def register(self, fd, events): | |
debug('register: fd=%d events=%d' % (fd, events)) | |
self._active[fd] = self.control(fd, events, 0, self.ADD) | |
def modify(self, fd, events): | |
debug('modify: fd=%d new=%d orig=%d' % (fd, events, self._active[fd])) | |
self._active[fd] = self.control(fd, events, self._active[fd], self.ADD) | |
def unregister(self, fd): | |
debug('unregister: fd=%d' % fd) | |
self.control(fd, self._active.pop(fd), 0, self.DELETE) | |
def control(self, fd, new, old, flags): | |
if old and old == new: | |
## Try to bail out early if there's nothing to change. | |
return new | |
elif not new & IOLoop.WRITE: | |
## Force a READ if there's not a WRITE. | |
new |= IOLoop.READ | |
if new & IOLoop.WRITE: | |
if not old & IOLoop.WRITE: | |
self._kq.control([select.kevent(fd, self.WRITE, flags)], 0) | |
elif old & IOLoop.WRITE: | |
self._kq.control([select.kevent(fd, self.WRITE, self.DELETE)], 0) | |
if new & IOLoop.READ: | |
if not old & IOLoop.READ: | |
self._kq.control([select.kevent(fd, self.READ, flags)], 0) | |
elif old & IOLoop.READ: | |
self._kq.control([select.kevent(fd, self.READ, self.DELETE)], 0) | |
return new | |
def poll(self, timeout): | |
kevents = self._kq.control(None, 1000, timeout) | |
events = [] | |
debug('\npoll: kevents=%r' % [(e.ident, e.filter) for e in kevents]) | |
for kevent in kevents: | |
fd = kevent.ident | |
flags = 0 | |
if kevent.flags & select.KQ_EV_ERROR: | |
flags |= IOLoop.ERROR | |
elif kevent.filter == select.KQ_FILTER_READ: | |
flags |= IOLoop.READ | |
elif kevent.filter == select.KQ_FILTER_WRITE: | |
flags |= IOLoop.WRITE | |
events.append((fd, flags)) | |
debug('poll: events=%r' % events) | |
return events | |
if __name__ == '__main__': | |
main(*sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%python ~/tmp/kqueue-write.py Modified | |
start: READ=8217 WRITE=8220 READ/WRITE=8221 | |
register: fd=4 events=8220 | |
tcp_server: (fd=6) | |
register: fd=6 events=8217 | |
tcp_client: fd=7 | |
register: fd=7 events=8216 | |
client: begin (fd=7) | |
client: writing 'hello' (fd=7) | |
modify: fd=7 new=8220 orig=8217 | |
modify: fd=7 new=8221 orig=8220 | |
poll: kevents=[(6L, -1), (7L, -2)] | |
poll: events=[(6L, 1), (7L, 4)] | |
tcp_server: accepted (fd=8) | |
register: fd=8 events=8216 | |
server: begin (fd=8) | |
modify: fd=8 new=8217 orig=8217 | |
IOStream: handle fd=7 events=4 want-write=1 want-read=1 | |
modify: fd=7 new=8217 orig=8221 | |
poll: kevents=[(8L, -1)] | |
poll: events=[(8L, 1)] | |
IOStream: handle fd=8 events=1 want-write=0 want-read=1 | |
server: got line 'hello' (fd=8) | |
server: writing "ECHO 'hello'" (fd=8) | |
modify: fd=8 new=8221 orig=8217 | |
poll: kevents=[(8L, -2)] | |
poll: events=[(8L, 4)] | |
IOStream: handle fd=8 events=4 want-write=1 want-read=1 | |
modify: fd=8 new=8217 orig=8221 | |
poll: kevents=[(7L, -1)] | |
poll: events=[(7L, 1)] | |
IOStream: handle fd=7 events=1 want-write=0 want-read=1 | |
client: got line "ECHO 'hello'" (fd=7) | |
client: writing 'RESET' (fd=7) | |
modify: fd=7 new=8221 orig=8217 | |
poll: kevents=[(7L, -2)] | |
poll: events=[(7L, 4)] | |
IOStream: handle fd=7 events=4 want-write=1 want-read=1 | |
modify: fd=7 new=8217 orig=8221 | |
poll: kevents=[(8L, -1)] | |
poll: events=[(8L, 1)] | |
IOStream: handle fd=8 events=1 want-write=0 want-read=1 | |
server: got line 'RESET' (fd=8) | |
server: writing 'PROCEED' (fd=8) | |
modify: fd=8 new=8221 orig=8217 | |
poll: kevents=[(8L, -2)] | |
poll: events=[(8L, 4)] | |
IOStream: handle fd=8 events=4 want-write=1 want-read=1 | |
modify: fd=8 new=8217 orig=8221 | |
poll: kevents=[(7L, -1)] | |
poll: events=[(7L, 1)] | |
IOStream: handle fd=7 events=1 want-write=0 want-read=1 | |
client: got line 'PROCEED' (fd=7) | |
client: writing 'QUIT' (fd=7) | |
modify: fd=7 new=8221 orig=8217 | |
poll: kevents=[(7L, -2)] | |
poll: events=[(7L, 4)] | |
IOStream: handle fd=7 events=4 want-write=1 want-read=1 | |
modify: fd=7 new=8217 orig=8221 | |
poll: kevents=[(8L, -1)] | |
poll: events=[(8L, 1)] | |
IOStream: handle fd=8 events=1 want-write=0 want-read=1 | |
server: got line 'QUIT' (fd=8) | |
server: writing 'GOODBYE' (fd=8) | |
modify: fd=8 new=8221 orig=8217 | |
poll: kevents=[(8L, -2)] | |
poll: events=[(8L, 4)] | |
IOStream: handle fd=8 events=4 want-write=1 want-read=1 | |
modify: fd=8 new=8217 orig=8221 | |
poll: kevents=[(7L, -1)] | |
poll: events=[(7L, 1)] | |
IOStream: handle fd=7 events=1 want-write=0 want-read=1 | |
client: got line 'GOODBYE' (fd=7) | |
unregister: fd=7 | |
poll: kevents=[(8L, -1)] | |
poll: events=[(8L, 1)] | |
IOStream: handle fd=8 events=1 want-write=0 want-read=1 | |
unregister: fd=8 | |
poll: kevents=[] | |
poll: events=[] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%python ~/tmp/kqueue-write.py Original | |
start: READ=8217 WRITE=8220 READ/WRITE=8221 | |
register: fd=4 events=8220 filter=-2 | |
tcp_server: (fd=6) | |
register: fd=6 events=8217 filter=-1 | |
tcp_client: fd=7 | |
register: fd=7 events=8216 filter=-1 | |
client: begin (fd=7) | |
client: writing 'hello' (fd=7) | |
modify: fd=7 events=8220 filter=-2 | |
unregister: fd=7 | |
register: fd=7 events=8220 filter=-2 | |
modify: fd=7 events=8221 filter=-1 | |
unregister: fd=7 | |
register: fd=7 events=8221 filter=-1 | |
poll: kevents=[(6L, -1)] | |
poll: events=[(6L, 5)] | |
tcp_server: accepted (fd=8) | |
register: fd=8 events=8216 filter=-1 | |
server: begin (fd=8) | |
modify: fd=8 events=8217 filter=-1 | |
unregister: fd=8 | |
register: fd=8 events=8217 filter=-1 | |
poll: kevents=[] | |
poll: events=[] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment