Skip to content

Instantly share code, notes, and snippets.

@coot
Created June 30, 2013 22:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save coot/5897200 to your computer and use it in GitHub Desktop.
Save coot/5897200 to your computer and use it in GitHub Desktop.
PEP 342 (http://www.python.org/dev/peps/pep-0342/) contains an example of a Trampoline (i.e. manager of coroutines) and an example of a simple server based on it. It is lacking a few details though. Here is working example. It does not catch StopIteration excetpion by design. You can add time.sleep(1) in Trampoline.run method to slow it down.
import collections
import types
import sys
import time
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
stdh = logging.StreamHandler()
stdh.setLevel(logging.INFO)
logger.addHandler(stdh)
fh = logging.FileHandler('trampoline.log', mode='w')
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
class Trampoline(object):
"""Manage communications between coroutines"""
running = False
def __init__(self):
self.queue = collections.deque()
def add(self, coroutine):
"""Request that a coroutine be executed"""
self.schedule(coroutine)
def run(self):
result = None
self.running = True
try:
while self.running and self.queue:
logger.info('run: queue {}'.format([func.func_defaults[0]
for func in
self.queue]))
# time.sleep(1)
func = self.queue.popleft()
result = func()
return result
finally:
self.running = False
def stop(self):
self.running = False
def schedule(self, coroutine, stack=(), val=None, *exc):
logger.debug('schedule: {}, {}'.format(repr(coroutine),
repr(val)))
def resume(coroutine=coroutine):
value = val
logger.debug('RESUME: {} {} {}'.format(repr(coroutine),
repr(val),
repr(stack)))
try:
if exc:
logger.debug('resume: coroutine.throw({})'.format(value))
value = coroutine.throw(value, *exc)
else:
logger.debug('resume: coroutine.send({})'.format(value))
value = coroutine.send(value)
logger.debug('resume: recieved {}'.format(value))
except:
if stack:
# send the error back to the "caller"
self.schedule(
stack[0], stack[1], *sys.exc_info()
)
else:
# Nothing left in this pseudothread to
# handle it, let it propagate to the
# run loop
raise
if isinstance(value, types.GeneratorType):
# Yielded to a specific coroutine, push the
# current one on the stack, and call the new
# one with no args
self.schedule(value, (coroutine, stack))
elif stack:
# Yielded a result, pop the stack and send the
# value to the caller
self.schedule(stack[0], stack[1], value)
logger.debug('RESUME: THIS PSEUDOTHREAD HAS ENDED\n')
# else: this pseudothread has ended
self.queue.append(resume)
class ConnectionLost(Exception):
pass
class Socket(object):
data = []
def __init__(self):
self.read_data = 0
def write(self, data):
logger.debug('--- socket.write({})'.format(data))
self.data.append(data)
def read(self):
self.read_data += 1
logger.debug('--- socket.read() -> {}'.format(self.read_data))
if self.read_data >= 4:
raise ConnectionLost('stop iteration')
return self.read_data
def nonblocking_read(sock):
logger.debug('nonblocking_read {}'.format(repr(sock)))
yield sock.read()
def nonblocking_accept(sock):
logger.debug('nonblocking_accept')
yield sock
def nonblocking_write(sock, data):
logger.debug('nonblocking_write: sock={}'.format(repr(sock)))
sock.write(data)
def listening_socket(host, data):
logger.debug('listening_socket')
return Socket()
def square_handler(sock):
while True:
logger.debug('echo_handler loop')
try:
data = (yield nonblocking_read(sock))
data = data**2
yield nonblocking_write(sock, data)
except ConnectionLost:
break # exit normally if connection lost
def listen_on(trampoline, sock, handler):
while trampoline.running:
# get the next incoming connection
logger.info('listen_on')
connected_socket = (yield nonblocking_accept(sock))
logger.info('CONNECTED_SOCKET={}'.format(connected_socket))
# start another coroutine to handle the connection
logger.info('TRAMPOLINE.ADD(HANDLER(CONNECTED_SOCKET))')
trampoline.add(handler(connected_socket))
t = Trampoline()
server = listen_on(
t,
listening_socket("localhost", "echo"),
square_handler
)
t.add(server)
# t.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment