Skip to content

Instantly share code, notes, and snippets.

View kunigami's full-sized avatar
:octocat:
Working from home

Guilherme Kunigami kunigami

:octocat:
Working from home
View GitHub Profile
def generator():
yield from generator2()
yield from generator3()
def generator2():
for i in range(10):
yield i
def generator3():
for j in range(10, 20):
yield j
def generator():
for i in generator2():
def add(x, y):
yield x + y
def main():
result = yield add(2, 2)
print(result)
yield
main_cr = main()
# add() is not executed
class Task(object):
def run(self):
while True:
try:
result = self.target.send(self.sendval)
if isinstance(result, SystemCall):
return result
# sub-routine call
def server(port):
sock = socket(AF_INET,SOCK_STREAM)
sock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sock.bind(("",port))
sock.listen(5)
while True:
yield ReadWait(sock)
client, addr = sock.accept()
yield NewTask(handle_client(client, addr))
class ReadWait(SystemCall):
def __init__(self, file):
self.file = file
def handle(self):
fd = self.file.fileno()
self.sched.waitforread(self.task, fd)
class Scheduler(object):
# Excerpt from http://www.dabeaz.com/coroutines/pyos4.py
# coroutine that will be wrapped in a task
def foo():
mytid = yield GetTid()
...
# A system call
class GetTid(SystemCall):
def handle(self):
def get_words(step):
f = open('/usr/share/dict/words', 'r')
for line in f:
step.send(line.strip())
@coroutine
def filter_step(step):
while True:
line = (yield)
if (line[-3:] == 'ion'):
@coroutine
def echo():
print("initialized")
while True:
line = (yield)
print(line)
def coroutine(fn):
def start(*args, **kwargs):
gen = fn(*args, **kwargs)
next(gen)
return gen
return start