Created
September 27, 2017 15:27
-
-
Save virantha/ce2bca039ce3522e1b4efccf0fe49c01 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Process: | |
next_id = 0 | |
def __init__(self, name): | |
self.name = name | |
self.id = Process.next_id | |
Process.next_id += 1 | |
def __str__(self): | |
return f"{self.name}.{self.id}" | |
def __repr__(self): | |
return f"{type(self).__name__}('{self.name}')" | |
def message(self, m): | |
print(f"{self}: {m}") | |
class Source(Process): | |
def __init__(self, name, length, srcval): | |
super().__init__(name) | |
self.val = srcval | |
self.length = length | |
def connect(self, chan): | |
self.out_chan = chan | |
async def exec(self): | |
for i in range(self.length): | |
self.message(f"sending {self.val}") | |
await self.out_chan.send(self.val) | |
self.message(f"sent {self.val}") | |
self.message("terminated") | |
from curio import CancelledError | |
class Sink(Process): | |
def __init__(self, name): | |
super().__init__(name) | |
def connect(self, chan): | |
self.in_chan = chan | |
async def exec(self): | |
tok_count = 0 | |
try: | |
while True: | |
tok = await self.in_chan.recv() | |
tok_count += 1 | |
self.message(f"received {tok}") | |
except CancelledError: | |
self.message(f"{tok_count} tokens received") | |
class Buffer(Process): | |
def __init__(self, name): | |
super().__init__(name) | |
def connect(self, chan_l, chan_r): | |
self.l_chan = chan_l | |
self.r_chan = chan_r | |
async def exec(self): | |
while True: | |
tok = await self.l_chan.recv() | |
self.message(f"received {tok}") | |
self.message(f"sending {tok}") | |
await self.r_chan.send(tok) | |
from curio import Queue | |
class Channel: | |
def __init__(self, name): | |
self.name = name | |
self.q = Queue(maxsize=1) # Max buffering of 1 | |
async def send(self, val): | |
await self.q.put(val) | |
async def recv(self): | |
tok = await self.q.get() | |
await self.q.task_done() | |
return tok | |
async def close(self): | |
await self.q.join() | |
from curio import run, spawn | |
async def system(): | |
N = 10 # How many buffers in our linear pipeline | |
# Define the channels | |
chan_l = Channel('l') | |
chan_r = [] | |
for i in range(N): | |
chan_r.append(Channel(f'R[{i}]')) | |
# Instantiate the processes | |
src = Source('src1', 10, 1) | |
buf = [Buffer(f'buf[{i}]') for i in range(N)] | |
snk = Sink('snk') | |
# Connect the processes with the channels | |
src.connect(chan_l) | |
buf[0].connect(chan_l, chan_r[0]) | |
for i in range(1, N): | |
buf[i].connect(chan_r[i-1], chan_r[i]) | |
snk.connect(chan_r[N-1]) | |
# Start the processes | |
p_src = await spawn(src.exec()) | |
p_snk = await spawn(snk.exec()) | |
p_buf = [await spawn(buf[i].exec()) for i in range(N)] | |
# Wait for the source to finish sending all its values | |
await p_src.join() | |
# Cancel the remaining processes | |
for i in range(N): | |
await p_buf[i].cancel() | |
await p_snk.cancel() | |
if __name__=='__main__': | |
run(system(), with_monitor=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment