Last active
November 10, 2015 09:15
-
-
Save lotusnowshen/ea4be96f37ad5a997e6c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
import types | |
class Task(object): | |
taskid = 0 | |
def __init__(self, target): | |
Task.taskid += 1 | |
self.tid = Task.taskid | |
self.target = target | |
self.sendval = None | |
self.stack = [] # 协程的调用栈 | |
def run(self): | |
while True: | |
try: | |
result = self.target.send(self.sendval) | |
if isinstance(result, SystemCall): | |
return result | |
if isinstance(result, types.GeneratorType): | |
self.stack.append(self.target) # 将协程保存入栈 | |
self.sendval = None | |
self.target = result # 将target替换为result,也就是新产生的协程 | |
else: | |
# 如果栈为空,说明没有产生新的协程 | |
if not self.stack: | |
return | |
# 将result作为结果返回 | |
# 从栈顶弹出之前保存的协程 | |
self.sendval = result | |
self.target = self.stack.pop() | |
except StopIteration: | |
# 如果栈为空,那么直接抛出,返回给上一层 | |
if not self.stack: | |
raise | |
# 否则直接返回None,从栈顶弹出target | |
self.sendval = None | |
self.target = self.stack.pop() | |
from Queue import Queue | |
import select | |
class Scheduler(object): | |
def __init__(self): | |
self.ready = Queue() | |
self.taskmap = {} | |
# 等待其他任务终止 | |
self.exit_waiting = {} | |
# 等待IO | |
self.read_waiting = {} | |
self.write_waiting = {} | |
# 创建新的任务 | |
def new(self, target): | |
newtask = Task(target) | |
self.taskmap[newtask.tid] = newtask | |
self.schedule(newtask) | |
return newtask.tid | |
# 终止某个任务 | |
def exit(self, task): | |
print 'Task %d terminated ' % task.tid | |
del self.taskmap[task.tid] | |
# 依次通知那些等待本task的任务,他们可以继续执行了 | |
for task in self.exit_waiting.pop(task.tid, []): | |
self.schedule(task) | |
# 将task加入到tid对应的数组,这样tid死后可以通知他们 | |
def wait_for_exit(self, task, waittid): | |
if waittid in self.taskmap: | |
# 将task放入tid对应的数组,tid死后,需要通知这些task | |
self.exit_waiting.setdefault(waittid, []).append(task) | |
return True | |
else: | |
# 该任务不存在,直接返回false | |
return False | |
def wait_for_read(self, task, fd): | |
self.read_waiting[fd] = task | |
def wait_for_write(self, task, fd): | |
self.write_waiting[fd] = task | |
def io_poll(self, timeout): | |
if self.read_waiting or self.write_waiting: | |
r, w, e = select.select(self.read_waiting, self.write_waiting, [], timeout) | |
for fd in r: | |
self.schedule(self.read_waiting.pop(fd)) | |
for fd in w: | |
self.schedule(self.write_waiting.pop(fd)) | |
def io_task(self): | |
while True: | |
if self.ready.empty(): | |
self.io_poll(None) # 永久等待 | |
else: | |
self.io_poll(0) # 立刻返回(非阻塞) | |
yield | |
# 将任务放入队列 | |
def schedule(self, task): | |
self.ready.put(task) | |
def mainloop(self): | |
self.new(self.io_task()) # 新建一个io loop任务,用于后面监听fd的读写事件 | |
while self.taskmap: | |
task = self.ready.get() | |
# print '取出一个任务' | |
try: | |
result = task.run() | |
# print '运行一次task' | |
if isinstance(result, SystemCall): | |
result.task = task | |
result.sched = self | |
result.handle() | |
# 注意这里通过handle里面的schedule 将task放回队列 | |
# 当然handle里面也可以不进行schedule,例如wait函数的实现 | |
continue | |
except StopIteration: | |
self.exit(task) | |
continue # 直接执行下一次循环 不再放入 | |
# 运行完再放回队列 | |
self.schedule(task) | |
class SystemCall(object): | |
def __init__(self): | |
self.sched = None | |
self.task = None | |
def handle(self): | |
pass | |
# 返回一个任务的tid | |
class GetTid(SystemCall): | |
def __init__(self): | |
SystemCall.__init__(self) | |
print 'GetTid调用' | |
def handle(self): | |
# 向协程发送的是task的tid | |
self.task.sendval = self.task.tid | |
# 下次执行run,就自动给协程发送tid | |
self.sched.schedule(self.task) | |
# 新建一个任务 | |
class NewTask(SystemCall): | |
def __init__(self, target): | |
SystemCall.__init__(self) | |
self.target = target | |
def handle(self): | |
tid = self.sched.new(self.target) | |
self.task.sendval = tid | |
# 发送的也是任务的tid | |
self.sched.schedule(self.task) | |
# 杀死一个任务 | |
class KillTask(SystemCall): | |
def __init__(self, tid): | |
SystemCall.__init__(self) | |
self.tid = tid | |
def handle(self): | |
# 根据tid取出该任务 | |
task = self.sched.taskmap.get(self.tid, None) | |
if task: | |
task.target.close() # 关闭协程 | |
self.task.sendval = True | |
else: | |
self.task.sendval = False | |
self.sched.schedule(self.task) | |
# 等待一个任务关闭 | |
class WaitTask(SystemCall): | |
def __init__(self, tid): | |
SystemCall.__init__(self) | |
self.tid = tid | |
def handle(self): | |
# tid是需要等待结束的任务id | |
result = self.sched.wait_for_exit(self.task, self.tid) | |
self.task.sendval = result | |
# 如果该任务不存在,那就直接调度task | |
# 如果存在,那就等到tid结束后再调度 | |
if not result: | |
self.sched.schedule(self.task) | |
# 等待可读事件 | |
class ReadWait(SystemCall): | |
def __init__(self, f): | |
SystemCall.__init__(self) | |
self.f = f | |
def handle(self): | |
fd = self.f.fileno() | |
self.sched.wait_for_read(self.task, fd) | |
# 将该task加入到等待队列,等select监听到它可读之后,再将其放入到可执行队列中 | |
# 等待可写事件 | |
class WriteWait(SystemCall): | |
def __init__(self, f): | |
SystemCall.__init__(self) | |
self.f = f | |
def handle(self): | |
fd = self.f.fileno() | |
self.sched.wait_for_write(self.task, fd) | |
# 将该task加入到等待队列,等select监听到它可写之后,再将其放入到可执行队列中 | |
def Accept(sock): | |
yield ReadWait(sock) | |
yield sock.accept() | |
def Send(sock, buffer): | |
while buffer: | |
yield WriteWait(sock) | |
len = sock.send(buffer) | |
buffer = buffer[len:] | |
def Recv(sock, maxbytes): | |
yield ReadWait(sock) | |
yield sock.recv(maxbytes) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
from pyos8 import * | |
import socket | |
from sockwrap import Socket | |
def handle_client(client, addr): | |
print "Connection from ", addr | |
while True: | |
data = yield client.recv(65536) | |
if not data: | |
break | |
yield client.send(data) | |
print "Client closed" | |
yield client.close() | |
def server(port): | |
print "Server starting" | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.setblocking(False) | |
# 地址复用 | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) | |
sock.bind(("", port)) | |
sock.listen(5) | |
sock = Socket(sock) | |
while True: | |
client, addr = yield sock.accept() | |
yield NewTask(handle_client(client, addr)) | |
sched = Scheduler() | |
sched.new(server(2007)) | |
sched.mainloop() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment