Skip to content

Instantly share code, notes, and snippets.

@vsajip
Forked from sznurek/coroutine.py
Created February 20, 2014 11:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vsajip/9111449 to your computer and use it in GitHub Desktop.
Save vsajip/9111449 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import sys
import socket
import select
def command(name, args=None):
return {'name': name, 'args': args}
def readcmd():
return command('read')
def writecmd(payload):
return command('write', {'payload': payload})
# This should be easiest to understand (epoll_interpret is much more
# complicated, even after some simplifications).
def stdin_interpret(generator):
cmd = generator.send(None) # Get initial command
try:
while True:
if cmd['name'] == 'read':
data = sys.stdin.readline()
cmd = generator.send(data.strip())
elif cmd['name'] == 'write':
sys.stdout.write(cmd['args']['payload'] + "\n")
cmd = generator.send(None)
sys.stdout.flush()
except StopIteration:
print("[intepretation done]")
return
# To simplify a bit I assumed that one line of data will be read/written
# in one recv/send call. In real implementation buffering will be necessary.
#
# This is probably more messy than it's need to be.
def epoll_interpret(sock, handler):
poll = select.epoll()
poll.register(sock.fileno(), select.EPOLLIN)
connections = {}
def close_connection(fd):
poll.unregister(fd)
connections[fd]['sock'].close()
del connections[fd]
def safe_handler_send(fd, data):
try:
connections[fd]['cmd'] = connections[fd]['handler'].send(data)
except StopIteration:
close_connection(fd)
while True:
events = poll.poll()
for (fd, event) in events:
if fd == sock.fileno():
conn, addr = sock.accept()
g = handler()
cmd = g.send(None)
connections[conn.fileno()] = {'sock': conn, 'cmd': cmd, 'handler': g}
poll.register(conn.fileno(), 0)
elif (event & select.EPOLLERR) != 0:
h= connections[fd]['handler']
close_connection(fd)
try:
h.throw(IOError())
except StopIteration:
pass
elif (event & select.EPOLLOUT) != 0:
connections[fd]['sock'].send(bytes(connections[fd]['cmd']['args']['payload'] + "\n", 'utf-8'))
safe_handler_send(fd, None)
else:
data = connections[fd]['sock'].recv(4096)
safe_handler_send(fd, str(data.strip(), 'utf-8'))
for fd, data in connections.items():
events = select.EPOLLERR
if data['cmd']['name'] == 'read':
events |= select.EPOLLIN
elif data['cmd']['name'] == 'write':
events |= select.EPOLLOUT
poll.modify(fd, events)
#!/usr/bin/env python3
from coroutine import *
"""
Python 3.3 required! (I need 'yield from' statement)
This is a proof of concept of multi client server written in asynchronous
style using Python's generators (or coroutines if you like).
Usage: python3 example.py [port-num], then nc localhost [port-num]. Login
and password are 'test' and 'pass' respectively. After successful login
it will behave as line-based echo server.
Even though code of 'cat' and 'perform_login' looks synchronous they are
'callback-driven' under the hood and all computation happens in the
same system level thread. Take a look at 'coroutine.epoll_interpret'.
Why this code was written?
1) I am sick of programming in callback passing style (come on, compilers/
interpreters should know how to make CPS transformation).
2) Look at the bottom of the file: the `epoll_interpret` function is
replaceable: I can test application logic from stdin, or I can feed
the input from list in unit test or whatever.
3) I wanted to learn Python's generators. They are cool.
"""
def cat():
while True:
try:
data = yield readcmd()
yield writecmd(data)
except IOError: # If user leaves, forgive him.
return
def perform_login(login, password):
yield writecmd('LOGIN')
user_login = yield readcmd()
yield writecmd('PASSWORD')
user_password = yield readcmd()
if user_login == login and user_password == password:
yield writecmd('OK')
print("User %s authenticated successfully!" % login)
# This new Python's feature is crucial for this example to work!
yield from cat()
else:
yield writecmd('ERR invalid login')
def thread_factory():
return perform_login('test', 'pass')
if __name__ == "__main__":
if len(sys.argv) < 2:
stdin_interpret(thread_factory())
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('0.0.0.0', int(sys.argv[1])))
sock.listen(5)
epoll_interpret(sock, thread_factory)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment