Skip to content

Instantly share code, notes, and snippets.

@nailor
Created October 13, 2011 11:56
Show Gist options
  • Save nailor/1284057 to your computer and use it in GitHub Desktop.
Save nailor/1284057 to your computer and use it in GitHub Desktop.
import errno
import socket
import select
class EventLoop(object):
def __init__(self):
self.callbacks = []
self.sources = []
self.handlers = {}
def add_handler(self, fd, callback):
self.handlers[fd] = callback
self.sources.append(fd)
def start(self):
poll_timeout = 0.2
while True:
self._running = True
r, w, _ = select.select(self.sources, [], [], poll_timeout)
for fd in r:
print fd
if fd in self.handlers:
self.handlers[fd]('READ')
class EchoServer(object):
def __init__(self, eventloop):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
self.socket.setblocking(0)
self.socket.bind(('localhost', 9999))
self.socket.listen(2)
self.eventloop = eventloop
def start(self):
self.eventloop.add_handler(self.socket.fileno(), self.handle_event)
def handle_event(self, event):
conn, addr = self.socket.accept()
try:
print conn.recv(4096)
except socket.error, e:
if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN):
print 'whops', e
return
raise
if __name__ == '__main__':
loop = EventLoop()
server = EchoServer(loop)
server.start()
loop.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment