Skip to content

Instantly share code, notes, and snippets.

@pnck
Last active January 22, 2019 08:48
Show Gist options
  • Save pnck/86c0773d5a89b68af1b7958d4bee637a to your computer and use it in GitHub Desktop.
Save pnck/86c0773d5a89b68af1b7958d4bee637a to your computer and use it in GitHub Desktop.
crorotine experiment implemented with yield
class Dispatcher:
def __init__(self):
self.tasks = []
def reg_task(self, t):
self.tasks.append(t)
def run(self):
while len(self.tasks):
for t in self.tasks:
try:
t.send(None)
except StopIteration as e:
self.tasks.remove(t)
if not self.tasks:
break
def switch(self):
yield
def stop(self):
self.tasks.clear()
d = Dispatcher()
def th1():
while True:
print('step1')
yield from d.switch()
print('step2')
def th2_loop10():
count = 5
while count:
count -= 1
yield from d.switch()
print('many ticks passed, time to go back')
return 10
def th3():
while True:
print('thread3_step1')
yield from d.switch()
print('thread3_step2: wait th2')
r = yield from th2_loop10()
print('thread3_step3: th2 returned ' + str(r))
d.stop()
def stop_early(tick):
while tick:
tick -= 1
yield from d.switch()
print('task finished')
if __name__ == '__main__':
d.reg_task(th1())
d.reg_task(th3())
d.reg_task(stop_early(3))
d.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment