Created
May 5, 2015 22:10
-
-
Save vrajivk/c505310fb79d412afcd5 to your computer and use it in GitHub Desktop.
Go style channels using PEP492 async/await
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import deque, namedtuple | |
import types | |
class Channel: | |
def __init__(self, name='', size=1): | |
self.deque = deque(maxlen=size) | |
self.name = name | |
@types.coroutine | |
def transmit(self, message): | |
yield (self, "transmit") | |
self.deque.append(message) | |
@types.coroutine | |
def receive(self): | |
yield (self, "receive") | |
return self.deque.popleft() | |
def full(self): | |
return len(self.deque) == self.deque.maxlen | |
def empty(self): | |
return len(self.deque) == 0 | |
def __repr__(self): | |
return self.name | |
Waiting = namedtuple('Waiting', "function channel mode") | |
ready_transmit = lambda w: w.mode == 'transmit' and not w.channel.full() | |
ready_receive = lambda w: w.mode == 'receive' and not w.channel.empty() | |
class Loop: | |
def __init__(self): | |
self.pending = set() | |
def run(self, f): | |
v = f.send(None) | |
self.pending.add(Waiting(f, *v)) | |
def _pick(self): | |
candidates = set(w for w in self.pending if ready_transmit(w) or ready_receive(w)) | |
try: | |
pick = candidates.pop() | |
except KeyError: | |
raise Exception('deadlock!') | |
else: | |
self.pending.remove(pick) | |
return pick | |
def run_until_complete(self, f): | |
self.run(f) | |
while any(w.function is f for w in self.pending): | |
pick = self._pick() | |
try: | |
self.run(pick.function) | |
except StopIteration: | |
pass | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from itertools import count | |
import sys | |
from channel import Channel, Loop | |
async def generate(ch): | |
for i in count(2): | |
await ch.transmit(i) | |
async def filter_(in_, out, prime): | |
while True: | |
i = await in_.receive() | |
if i%prime != 0: | |
await out.transmit(i) | |
async def main(n, loop): | |
ch = Channel() | |
loop.run(generate(ch)) | |
for i in range(n): | |
prime = await ch.receive() | |
print(prime) | |
ch1 = Channel() | |
loop.run(filter_(ch, ch1, prime)) | |
ch = ch1 | |
if __name__ == '__main__': | |
loop = Loop() | |
n = int(sys.argv[1]) | |
loop.run_until_complete(main(n, loop)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment