Created
June 30, 2013 22:12
-
-
Save coot/5897200 to your computer and use it in GitHub Desktop.
PEP 342 (http://www.python.org/dev/peps/pep-0342/) contains an example of a Trampoline (i.e. manager of coroutines) and an example of a simple server based on it. It is lacking a few details though. Here is working example. It does not catch StopIteration excetpion by design. You can add time.sleep(1) in Trampoline.run method to slow it down.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import types | |
import sys | |
import time | |
import logging | |
logger = logging.getLogger() | |
logger.setLevel(logging.DEBUG) | |
stdh = logging.StreamHandler() | |
stdh.setLevel(logging.INFO) | |
logger.addHandler(stdh) | |
fh = logging.FileHandler('trampoline.log', mode='w') | |
fh.setLevel(logging.DEBUG) | |
logger.addHandler(fh) | |
class Trampoline(object): | |
"""Manage communications between coroutines""" | |
running = False | |
def __init__(self): | |
self.queue = collections.deque() | |
def add(self, coroutine): | |
"""Request that a coroutine be executed""" | |
self.schedule(coroutine) | |
def run(self): | |
result = None | |
self.running = True | |
try: | |
while self.running and self.queue: | |
logger.info('run: queue {}'.format([func.func_defaults[0] | |
for func in | |
self.queue])) | |
# time.sleep(1) | |
func = self.queue.popleft() | |
result = func() | |
return result | |
finally: | |
self.running = False | |
def stop(self): | |
self.running = False | |
def schedule(self, coroutine, stack=(), val=None, *exc): | |
logger.debug('schedule: {}, {}'.format(repr(coroutine), | |
repr(val))) | |
def resume(coroutine=coroutine): | |
value = val | |
logger.debug('RESUME: {} {} {}'.format(repr(coroutine), | |
repr(val), | |
repr(stack))) | |
try: | |
if exc: | |
logger.debug('resume: coroutine.throw({})'.format(value)) | |
value = coroutine.throw(value, *exc) | |
else: | |
logger.debug('resume: coroutine.send({})'.format(value)) | |
value = coroutine.send(value) | |
logger.debug('resume: recieved {}'.format(value)) | |
except: | |
if stack: | |
# send the error back to the "caller" | |
self.schedule( | |
stack[0], stack[1], *sys.exc_info() | |
) | |
else: | |
# Nothing left in this pseudothread to | |
# handle it, let it propagate to the | |
# run loop | |
raise | |
if isinstance(value, types.GeneratorType): | |
# Yielded to a specific coroutine, push the | |
# current one on the stack, and call the new | |
# one with no args | |
self.schedule(value, (coroutine, stack)) | |
elif stack: | |
# Yielded a result, pop the stack and send the | |
# value to the caller | |
self.schedule(stack[0], stack[1], value) | |
logger.debug('RESUME: THIS PSEUDOTHREAD HAS ENDED\n') | |
# else: this pseudothread has ended | |
self.queue.append(resume) | |
class ConnectionLost(Exception): | |
pass | |
class Socket(object): | |
data = [] | |
def __init__(self): | |
self.read_data = 0 | |
def write(self, data): | |
logger.debug('--- socket.write({})'.format(data)) | |
self.data.append(data) | |
def read(self): | |
self.read_data += 1 | |
logger.debug('--- socket.read() -> {}'.format(self.read_data)) | |
if self.read_data >= 4: | |
raise ConnectionLost('stop iteration') | |
return self.read_data | |
def nonblocking_read(sock): | |
logger.debug('nonblocking_read {}'.format(repr(sock))) | |
yield sock.read() | |
def nonblocking_accept(sock): | |
logger.debug('nonblocking_accept') | |
yield sock | |
def nonblocking_write(sock, data): | |
logger.debug('nonblocking_write: sock={}'.format(repr(sock))) | |
sock.write(data) | |
def listening_socket(host, data): | |
logger.debug('listening_socket') | |
return Socket() | |
def square_handler(sock): | |
while True: | |
logger.debug('echo_handler loop') | |
try: | |
data = (yield nonblocking_read(sock)) | |
data = data**2 | |
yield nonblocking_write(sock, data) | |
except ConnectionLost: | |
break # exit normally if connection lost | |
def listen_on(trampoline, sock, handler): | |
while trampoline.running: | |
# get the next incoming connection | |
logger.info('listen_on') | |
connected_socket = (yield nonblocking_accept(sock)) | |
logger.info('CONNECTED_SOCKET={}'.format(connected_socket)) | |
# start another coroutine to handle the connection | |
logger.info('TRAMPOLINE.ADD(HANDLER(CONNECTED_SOCKET))') | |
trampoline.add(handler(connected_socket)) | |
t = Trampoline() | |
server = listen_on( | |
t, | |
listening_socket("localhost", "echo"), | |
square_handler | |
) | |
t.add(server) | |
# t.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment