Skip to content

Instantly share code, notes, and snippets.

@weaver
Created February 10, 2010 16:52
Show Gist options
  • Save weaver/300540 to your computer and use it in GitHub Desktop.
Save weaver/300540 to your computer and use it in GitHub Desktop.
import sys, socket, select, errno
from tornado import ioloop, iostream
### Main Program
def main(kind=None):
if not kind or kind not in ('Original', 'Modified'):
usage()
debug('start: READ=%d WRITE=%d READ/WRITE=%d' % (
IOLoop.READ | IOLoop.ERROR,
IOLoop.WRITE | IOLoop.ERROR,
IOLoop.READ | IOLoop.WRITE | IOLoop.ERROR
))
io = IOLoop(globals()[kind]() if hasattr(select, 'kqueue') else None)
S = tcp_server(server, bind('127.0.0.1', 9000), io)
C = tcp_client(client, connect('127.0.0.1', 9000), io)
io.start()
def usage():
print 'usage: %s Original|Modified' % sys.argv[0]
sys.exit(1)
### Application
def server(stream):
fd = stream.socket.fileno()
def handle_line(line):
debug('server: got line %r (fd=%d)' % (line, fd))
if line == 'QUIT':
write('GOODBYE')
elif line == 'RESET':
write('PROCEED')
else:
write('ECHO %r' % line)
def write(data):
debug('server: writing %r (fd=%d)' % (data, fd))
stream.write('%s\n' % data)
wait()
def wait():
stream.read_until('\n', lambda d: handle_line(d.strip()))
debug('server: begin (fd=%d)' % fd)
wait()
def client(stream):
fd = stream.socket.fileno()
def handle_line(line):
debug('client: got line %r (fd=%d)' % (line, fd))
if line == "ECHO 'hello'":
write('RESET')
elif line == 'PROCEED':
write('QUIT')
elif line == 'GOODBYE':
stream.close()
else:
wait()
def write(data, callback=None):
debug('client: writing %r (fd=%d)' % (data, fd))
stream.write('%s\n' % data, callback)
wait()
def wait():
stream.read_until('\n', lambda d: handle_line(d.strip()))
debug('client: begin (fd=%d)' % fd)
write('hello')
### TCP
def tcp_server(handle, sock, io):
def accept(fd, events):
while True:
try:
conn, addr = sock.accept()
debug('tcp_server: accepted (fd=%d)' % conn.fileno())
handle(IOStream(conn, io))
except socket.error as exc:
if exc[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
return
debug('tcp_server: (fd=%d)' % sock.fileno())
io.set_handler(sock.fileno(), accept, io.READ)
return sock
def bind(addr, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind((addr, int(port)))
sock.listen(128)
return sock
def tcp_client(handle, sock, io):
debug('tcp_client: fd=%d' % sock.fileno())
handle(IOStream(sock, io))
return sock
def connect(addr, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setblocking(0)
try:
sock.connect((addr, int(port)))
except socket.error as exc:
if exc[0] != errno.EINPROGRESS:
raise
return sock
### IO
def debug(message):
print >> sys.stderr, message
class IOLoop(ioloop.IOLoop):
"""Extend the add_handler() method to become set_handler()."""
def add_handler(self, fd, handler, events):
return self.set_handler(fd, handler, events)
def set_handler(self, fd, handler, events):
"""Registers the given handler to receive the given events for
fd. If a handler is already registered, it is replaced."""
if fd in self._handlers:
self._handlers[fd] = handler
self._impl.modify(fd, events | self.ERROR)
else:
self._handlers[fd] = handler
self._impl.register(fd, events | self.ERROR)
class IOStream(iostream.IOStream):
"""Add a debug statement to _handle_events. Otherwise this acts
the same as Tornado's IOStream."""
def _handle_events(self, fd, events):
debug('IOStream: handle fd=%d events=%d want-write=%d want-read=%d' % (
fd,
events,
bool(self._write_buffer),
bool(self._read_bytes or self._read_delimiter)
))
return super(IOStream, self)._handle_events(fd, events)
class Original(ioloop._KQueue):
def register(self, fd, events):
debug('register: fd=%d events=%d filter=%d' % (fd, events, kq_filter(events)))
super(Original, self).register(fd, events)
def modify(self, fd, events):
debug('modify: fd=%d events=%d filter=%d' % (fd, events, kq_filter(events)))
super(Original, self).modify(fd, events)
def unregister(self, fd):
debug('unregister: fd=%d' % fd)
super(Original, self).unregister(fd)
def poll(self, timeout):
## This method was just copied from ioloop without changes
## except to add debugging statements.
kevents = self._kqueue.control(None, 1000, timeout)
events = []
debug('\npoll: kevents=%r' % [(e.ident, e.filter) for e in kevents])
for kevent in kevents:
fd = kevent.ident
flags = 0
if kevent.filter & select.KQ_FILTER_READ:
flags |= IOLoop.READ
if kevent.filter & select.KQ_FILTER_WRITE:
flags |= IOLoop.WRITE
if kevent.flags & select.KQ_EV_ERROR:
flags |= IOLoop.ERROR
events.append((fd, flags))
debug('poll: events=%r' % events)
return events
def kq_filter(events):
state = 0
if events & IOLoop.WRITE:
state |= select.KQ_FILTER_WRITE
if events & IOLoop.READ or state == 0:
state |= select.KQ_FILTER_READ
return state
class Modified(object):
## shortcuts
if hasattr(select, 'kqueue'):
READ = select.KQ_FILTER_READ
WRITE = select.KQ_FILTER_WRITE
DELETE = select.KQ_EV_DELETE
ADD = select.KQ_EV_ADD
ENABLE = select.KQ_EV_ENABLE
DISABLE = select.KQ_EV_DISABLE
def __init__(self):
self._kq = select.kqueue()
self._active = {}
def register(self, fd, events):
debug('register: fd=%d events=%d' % (fd, events))
self._active[fd] = self.control(fd, events, 0, self.ADD)
def modify(self, fd, events):
debug('modify: fd=%d new=%d orig=%d' % (fd, events, self._active[fd]))
self._active[fd] = self.control(fd, events, self._active[fd], self.ADD)
def unregister(self, fd):
debug('unregister: fd=%d' % fd)
self.control(fd, self._active.pop(fd), 0, self.DELETE)
def control(self, fd, new, old, flags):
if old and old == new:
## Try to bail out early if there's nothing to change.
return new
elif not new & IOLoop.WRITE:
## Force a READ if there's not a WRITE.
new |= IOLoop.READ
if new & IOLoop.WRITE:
if not old & IOLoop.WRITE:
self._kq.control([select.kevent(fd, self.WRITE, flags)], 0)
elif old & IOLoop.WRITE:
self._kq.control([select.kevent(fd, self.WRITE, self.DELETE)], 0)
if new & IOLoop.READ:
if not old & IOLoop.READ:
self._kq.control([select.kevent(fd, self.READ, flags)], 0)
elif old & IOLoop.READ:
self._kq.control([select.kevent(fd, self.READ, self.DELETE)], 0)
return new
def poll(self, timeout):
kevents = self._kq.control(None, 1000, timeout)
events = []
debug('\npoll: kevents=%r' % [(e.ident, e.filter) for e in kevents])
for kevent in kevents:
fd = kevent.ident
flags = 0
if kevent.flags & select.KQ_EV_ERROR:
flags |= IOLoop.ERROR
elif kevent.filter == select.KQ_FILTER_READ:
flags |= IOLoop.READ
elif kevent.filter == select.KQ_FILTER_WRITE:
flags |= IOLoop.WRITE
events.append((fd, flags))
debug('poll: events=%r' % events)
return events
if __name__ == '__main__':
main(*sys.argv[1:])
%python ~/tmp/kqueue-write.py Modified
start: READ=8217 WRITE=8220 READ/WRITE=8221
register: fd=4 events=8220
tcp_server: (fd=6)
register: fd=6 events=8217
tcp_client: fd=7
register: fd=7 events=8216
client: begin (fd=7)
client: writing 'hello' (fd=7)
modify: fd=7 new=8220 orig=8217
modify: fd=7 new=8221 orig=8220
poll: kevents=[(6L, -1), (7L, -2)]
poll: events=[(6L, 1), (7L, 4)]
tcp_server: accepted (fd=8)
register: fd=8 events=8216
server: begin (fd=8)
modify: fd=8 new=8217 orig=8217
IOStream: handle fd=7 events=4 want-write=1 want-read=1
modify: fd=7 new=8217 orig=8221
poll: kevents=[(8L, -1)]
poll: events=[(8L, 1)]
IOStream: handle fd=8 events=1 want-write=0 want-read=1
server: got line 'hello' (fd=8)
server: writing "ECHO 'hello'" (fd=8)
modify: fd=8 new=8221 orig=8217
poll: kevents=[(8L, -2)]
poll: events=[(8L, 4)]
IOStream: handle fd=8 events=4 want-write=1 want-read=1
modify: fd=8 new=8217 orig=8221
poll: kevents=[(7L, -1)]
poll: events=[(7L, 1)]
IOStream: handle fd=7 events=1 want-write=0 want-read=1
client: got line "ECHO 'hello'" (fd=7)
client: writing 'RESET' (fd=7)
modify: fd=7 new=8221 orig=8217
poll: kevents=[(7L, -2)]
poll: events=[(7L, 4)]
IOStream: handle fd=7 events=4 want-write=1 want-read=1
modify: fd=7 new=8217 orig=8221
poll: kevents=[(8L, -1)]
poll: events=[(8L, 1)]
IOStream: handle fd=8 events=1 want-write=0 want-read=1
server: got line 'RESET' (fd=8)
server: writing 'PROCEED' (fd=8)
modify: fd=8 new=8221 orig=8217
poll: kevents=[(8L, -2)]
poll: events=[(8L, 4)]
IOStream: handle fd=8 events=4 want-write=1 want-read=1
modify: fd=8 new=8217 orig=8221
poll: kevents=[(7L, -1)]
poll: events=[(7L, 1)]
IOStream: handle fd=7 events=1 want-write=0 want-read=1
client: got line 'PROCEED' (fd=7)
client: writing 'QUIT' (fd=7)
modify: fd=7 new=8221 orig=8217
poll: kevents=[(7L, -2)]
poll: events=[(7L, 4)]
IOStream: handle fd=7 events=4 want-write=1 want-read=1
modify: fd=7 new=8217 orig=8221
poll: kevents=[(8L, -1)]
poll: events=[(8L, 1)]
IOStream: handle fd=8 events=1 want-write=0 want-read=1
server: got line 'QUIT' (fd=8)
server: writing 'GOODBYE' (fd=8)
modify: fd=8 new=8221 orig=8217
poll: kevents=[(8L, -2)]
poll: events=[(8L, 4)]
IOStream: handle fd=8 events=4 want-write=1 want-read=1
modify: fd=8 new=8217 orig=8221
poll: kevents=[(7L, -1)]
poll: events=[(7L, 1)]
IOStream: handle fd=7 events=1 want-write=0 want-read=1
client: got line 'GOODBYE' (fd=7)
unregister: fd=7
poll: kevents=[(8L, -1)]
poll: events=[(8L, 1)]
IOStream: handle fd=8 events=1 want-write=0 want-read=1
unregister: fd=8
poll: kevents=[]
poll: events=[]
%python ~/tmp/kqueue-write.py Original
start: READ=8217 WRITE=8220 READ/WRITE=8221
register: fd=4 events=8220 filter=-2
tcp_server: (fd=6)
register: fd=6 events=8217 filter=-1
tcp_client: fd=7
register: fd=7 events=8216 filter=-1
client: begin (fd=7)
client: writing 'hello' (fd=7)
modify: fd=7 events=8220 filter=-2
unregister: fd=7
register: fd=7 events=8220 filter=-2
modify: fd=7 events=8221 filter=-1
unregister: fd=7
register: fd=7 events=8221 filter=-1
poll: kevents=[(6L, -1)]
poll: events=[(6L, 5)]
tcp_server: accepted (fd=8)
register: fd=8 events=8216 filter=-1
server: begin (fd=8)
modify: fd=8 events=8217 filter=-1
unregister: fd=8
register: fd=8 events=8217 filter=-1
poll: kevents=[]
poll: events=[]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment