Skip to content

Instantly share code, notes, and snippets.

@vrajivk
Created May 5, 2015
Embed
What would you like to do?
Go style channels using PEP492 async/await
from collections import deque, namedtuple
import types
class Channel:
def __init__(self, name='', size=1):
self.deque = deque(maxlen=size)
self.name = name
@types.coroutine
def transmit(self, message):
yield (self, "transmit")
self.deque.append(message)
@types.coroutine
def receive(self):
yield (self, "receive")
return self.deque.popleft()
def full(self):
return len(self.deque) == self.deque.maxlen
def empty(self):
return len(self.deque) == 0
def __repr__(self):
return self.name
Waiting = namedtuple('Waiting', "function channel mode")
ready_transmit = lambda w: w.mode == 'transmit' and not w.channel.full()
ready_receive = lambda w: w.mode == 'receive' and not w.channel.empty()
class Loop:
def __init__(self):
self.pending = set()
def run(self, f):
v = f.send(None)
self.pending.add(Waiting(f, *v))
def _pick(self):
candidates = set(w for w in self.pending if ready_transmit(w) or ready_receive(w))
try:
pick = candidates.pop()
except KeyError:
raise Exception('deadlock!')
else:
self.pending.remove(pick)
return pick
def run_until_complete(self, f):
self.run(f)
while any(w.function is f for w in self.pending):
pick = self._pick()
try:
self.run(pick.function)
except StopIteration:
pass
from itertools import count
import sys
from channel import Channel, Loop
async def generate(ch):
for i in count(2):
await ch.transmit(i)
async def filter_(in_, out, prime):
while True:
i = await in_.receive()
if i%prime != 0:
await out.transmit(i)
async def main(n, loop):
ch = Channel()
loop.run(generate(ch))
for i in range(n):
prime = await ch.receive()
print(prime)
ch1 = Channel()
loop.run(filter_(ch, ch1, prime))
ch = ch1
if __name__ == '__main__':
loop = Loop()
n = int(sys.argv[1])
loop.run_until_complete(main(n, loop))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment