Skip to content

Instantly share code, notes, and snippets.

@Petelin
Created March 1, 2017 09:23
Show Gist options
  • Save Petelin/b0c570f3a53680f9822a7ee945b7934b to your computer and use it in GitHub Desktop.
Save Petelin/b0c570f3a53680f9822a7ee945b7934b to your computer and use it in GitHub Desktop.
使用yield实现的操作调度, 包括一个非阻塞的 sleep 方法
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
# Author : zhangxiaolin
# E-mail : petelin1120@gmail.com
# Date : 16/11/3 16:03
# Desc : ...
from queue import Queue
import socket
import select
from collections import defaultdict
import heapq
import datetime
import types
class Task(object):
task_count = 0
def __init__(self, target):
Task.task_count += 1
self.tid = Task.task_count
self.target = target
self.sendval = None
def run(self):
return self.target.send(self.sendval)
def kill(self):
self.target.close()
class Sleeper(Task):
def __init__(self, seconds):
now = datetime.datetime.now()
self.wait = now + datetime.timedelta(seconds=seconds)
self.stop = False
def run(self):
self.stop = True
now = datetime.datetime.now()
self.result = now
return now
def __iter__(self):
while not self.stop:
yield self
return self.result
def __lt__(self, other):
return self.wait < other.wait
def __le__(self, other):
if self.wait < other.wait:
return True
return self.__eq__(other)
def __ge__(self, other):
if self.wait > other.wait:
return True
return self.__eq__(other)
def __eq__(self, other):
return self.wait == other.wait
## system call
class SystemCall(object):
def handle(self, scheduler, task):
pass
class GETPID(SystemCall):
def handle(self, s, t):
t.sendval = t.tid
s.schedu(t)
class FORK(SystemCall):
def __init__(self, target):
self.target = target
def handle(self, scheduler, task):
tid = scheduler.new(self.target)
task.sendval = tid
scheduler.schedu(task)
class KILL(SystemCall):
def __init__(self, tid):
self.tid = tid
def handle(self, scheduler, task):
kt = scheduler.runmap.get(self.tid, None)
if kt:
kt.kill()
task.sendval = True
else:
task.sendval = False
scheduler.schedu(task)
class WAIT(SystemCall):
def __init__(self, tid):
self.tid = tid
def handle(self, scheduler, current_tas):
scheduler.add_to_wait(self.tid, current_tas.tid)
class READ(SystemCall):
def __init__(self, connect):
self.connect = connect
def handle(self, scheduler, task):
scheduler.add_to_read(self.connect, task)
class WRITE(SystemCall):
def __init__(self, connect):
self.connect = connect
def handle(self, scheduler, task):
scheduler.add_to_write(self.connect, task)
class Scheduler(object):
def __init__(self):
self.ready = Queue()
self.runmap = {}
self.waitmap = defaultdict(list)
self.write_wait = {}
self.read_wait = {}
self.sleepqueue = []
def new(self, target):
"""target is a generator"""
t = Task(target)
self.runmap[t.tid] = t
self.schedu(t)
return t.tid
def schedu(self, task):
self.ready.put(task)
def exit(self, task):
print("--*- tid:{tid}, {t} exit-*--".format(tid=task.tid, t=task.target.__name__))
del self.runmap[task.tid]
if task.tid in self.waitmap:
for task in self.waitmap[task.tid]:
self.runmap[task.tid] = task
task.sendval = True
self.schedu(task)
def add_to_wait(self, run_tid, wait_tid):
return self.waitmap[run_tid].append(self.runmap.pop(wait_tid))
def add_to_read(self, connect, task):
self.read_wait[connect] = task
def add_to_write(self, connect, task):
self.write_wait[connect] = task
def ioloop(self):
if self.read_wait or self.write_wait:
rs, ws, e = select.select(self.read_wait.keys(), self.write_wait.keys(), [], 0)
for r in rs:
self.schedu(self.read_wait.pop(r))
for w in ws:
self.schedu(self.write_wait.pop(w))
def sleeploop(self):
while self.sleepqueue:
sleeper = self.sleepqueue[0]
now = datetime.datetime.now()
if now < sleeper.wait:
break
sleeper = self.sleepqueue.pop(0)
self.schedu(sleeper)
def loop(self):
while self.runmap or self.sleepqueue:
self.ioloop()
self.sleeploop()
try:
t = self.ready.get_nowait()
result = t.run()
if isinstance(result, SystemCall):
result.handle(self, t)
continue
except StopIteration:
self.exit(t)
continue
except Exception as e:
print(e)
continue
self.schedu(t)
import sys
sys.exit(0)
@types.coroutine
def isleep(t):
sper = Sleeper(t)
heapq.heappush(s.sleepqueue, sper)
result = (yield from sper)
print(result)
return result
def foo():
pid = yield GETPID()
sid = yield FORK(bar)
for i in range(1):
print("part foo: %d" % pid)
yield
a = yield KILL(sid)
print(a)
def bar():
pid = yield GETPID()
for i in range(100):
print("part bar: %d" % pid)
yield
def child():
pid = yield GETPID()
for i in range(10):
print("i am child: %d" % pid)
yield
def father():
cid = yield FORK(child)
print("wait for child")
yield WAIT(cid)
print("child done")
############### now we are good, look future, like a echo server....
def server(port):
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.bind(("", port))
srv.listen(5)
while True:
yield READ(srv)
print("发现链接...")
client, addr = srv.accept()
yield FORK(handle_client(client, addr))
def handle_client(client, addr):
try:
print("Connection from", addr)
while True:
yield READ(client)
print(client.getpeername(), "\t接受数据")
data = client.recv(65536)
if not data:
break
from web import servicess
yield WRITE(client)
servicess.send(client, False)
client.close()
except:
pass
yield # Make the function a generator/coroutine
def heartbeat(t, sysmbol='.'):
print('heartbeat')
while 1:
# 这个sleep的时候确实阻塞的只是通过yield暂时交出了执行权利.
yield from isleep(t)
print(sysmbol)
s = Scheduler()
s.new(heartbeat(1, '-'))
s.new(heartbeat(1.5, '+'))
s.new(server(8080))
s.loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment