Skip to content

Instantly share code, notes, and snippets.

@Andrew-liu
Forked from lotusnowshen/coroutine.py
Created November 10, 2015 09:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Andrew-liu/b49ae7255647766855d1 to your computer and use it in GitHub Desktop.
Save Andrew-liu/b49ae7255647766855d1 to your computer and use it in GitHub Desktop.
# coding: utf-8
import types
class Task(object):
taskid = 0
def __init__(self, target):
Task.taskid += 1
self.tid = Task.taskid
self.target = target
self.sendval = None
self.stack = [] # 协程的调用栈
def run(self):
while True:
try:
result = self.target.send(self.sendval)
if isinstance(result, SystemCall):
return result
if isinstance(result, types.GeneratorType):
self.stack.append(self.target) # 将协程保存入栈
self.sendval = None
self.target = result # 将target替换为result,也就是新产生的协程
else:
# 如果栈为空,说明没有产生新的协程
if not self.stack:
return
# 将result作为结果返回
# 从栈顶弹出之前保存的协程
self.sendval = result
self.target = self.stack.pop()
except StopIteration:
# 如果栈为空,那么直接抛出,返回给上一层
if not self.stack:
raise
# 否则直接返回None,从栈顶弹出target
self.sendval = None
self.target = self.stack.pop()
from Queue import Queue
import select
class Scheduler(object):
def __init__(self):
self.ready = Queue()
self.taskmap = {}
# 等待其他任务终止
self.exit_waiting = {}
# 等待IO
self.read_waiting = {}
self.write_waiting = {}
# 创建新的任务
def new(self, target):
newtask = Task(target)
self.taskmap[newtask.tid] = newtask
self.schedule(newtask)
return newtask.tid
# 终止某个任务
def exit(self, task):
print 'Task %d terminated ' % task.tid
del self.taskmap[task.tid]
# 依次通知那些等待本task的任务,他们可以继续执行了
for task in self.exit_waiting.pop(task.tid, []):
self.schedule(task)
# 将task加入到tid对应的数组,这样tid死后可以通知他们
def wait_for_exit(self, task, waittid):
if waittid in self.taskmap:
# 将task放入tid对应的数组,tid死后,需要通知这些task
self.exit_waiting.setdefault(waittid, []).append(task)
return True
else:
# 该任务不存在,直接返回false
return False
def wait_for_read(self, task, fd):
self.read_waiting[fd] = task
def wait_for_write(self, task, fd):
self.write_waiting[fd] = task
def io_poll(self, timeout):
if self.read_waiting or self.write_waiting:
r, w, e = select.select(self.read_waiting, self.write_waiting, [], timeout)
for fd in r:
self.schedule(self.read_waiting.pop(fd))
for fd in w:
self.schedule(self.write_waiting.pop(fd))
def io_task(self):
while True:
if self.ready.empty():
self.io_poll(None) # 永久等待
else:
self.io_poll(0) # 立刻返回(非阻塞)
yield
# 将任务放入队列
def schedule(self, task):
self.ready.put(task)
def mainloop(self):
self.new(self.io_task()) # 新建一个io loop任务,用于后面监听fd的读写事件
while self.taskmap:
task = self.ready.get()
# print '取出一个任务'
try:
result = task.run()
# print '运行一次task'
if isinstance(result, SystemCall):
result.task = task
result.sched = self
result.handle()
# 注意这里通过handle里面的schedule 将task放回队列
# 当然handle里面也可以不进行schedule,例如wait函数的实现
continue
except StopIteration:
self.exit(task)
continue # 直接执行下一次循环 不再放入
# 运行完再放回队列
self.schedule(task)
class SystemCall(object):
def __init__(self):
self.sched = None
self.task = None
def handle(self):
pass
# 返回一个任务的tid
class GetTid(SystemCall):
def __init__(self):
SystemCall.__init__(self)
print 'GetTid调用'
def handle(self):
# 向协程发送的是task的tid
self.task.sendval = self.task.tid
# 下次执行run,就自动给协程发送tid
self.sched.schedule(self.task)
# 新建一个任务
class NewTask(SystemCall):
def __init__(self, target):
SystemCall.__init__(self)
self.target = target
def handle(self):
tid = self.sched.new(self.target)
self.task.sendval = tid
# 发送的也是任务的tid
self.sched.schedule(self.task)
# 杀死一个任务
class KillTask(SystemCall):
def __init__(self, tid):
SystemCall.__init__(self)
self.tid = tid
def handle(self):
# 根据tid取出该任务
task = self.sched.taskmap.get(self.tid, None)
if task:
task.target.close() # 关闭协程
self.task.sendval = True
else:
self.task.sendval = False
self.sched.schedule(self.task)
# 等待一个任务关闭
class WaitTask(SystemCall):
def __init__(self, tid):
SystemCall.__init__(self)
self.tid = tid
def handle(self):
# tid是需要等待结束的任务id
result = self.sched.wait_for_exit(self.task, self.tid)
self.task.sendval = result
# 如果该任务不存在,那就直接调度task
# 如果存在,那就等到tid结束后再调度
if not result:
self.sched.schedule(self.task)
# 等待可读事件
class ReadWait(SystemCall):
def __init__(self, f):
SystemCall.__init__(self)
self.f = f
def handle(self):
fd = self.f.fileno()
self.sched.wait_for_read(self.task, fd)
# 将该task加入到等待队列,等select监听到它可读之后,再将其放入到可执行队列中
# 等待可写事件
class WriteWait(SystemCall):
def __init__(self, f):
SystemCall.__init__(self)
self.f = f
def handle(self):
fd = self.f.fileno()
self.sched.wait_for_write(self.task, fd)
# 将该task加入到等待队列,等select监听到它可写之后,再将其放入到可执行队列中
def Accept(sock):
yield ReadWait(sock)
yield sock.accept()
def Send(sock, buffer):
while buffer:
yield WriteWait(sock)
len = sock.send(buffer)
buffer = buffer[len:]
def Recv(sock, maxbytes):
yield ReadWait(sock)
yield sock.recv(maxbytes)
# coding: utf-8
from pyos8 import *
import socket
from sockwrap import Socket
def handle_client(client, addr):
print "Connection from ", addr
while True:
data = yield client.recv(65536)
if not data:
break
yield client.send(data)
print "Client closed"
yield client.close()
def server(port):
print "Server starting"
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
# 地址复用
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
sock.bind(("", port))
sock.listen(5)
sock = Socket(sock)
while True:
client, addr = yield sock.accept()
yield NewTask(handle_client(client, addr))
sched = Scheduler()
sched.new(server(2007))
sched.mainloop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment