class Process: | |
next_id = 0 | |
def __init__(self, name): | |
self.name = name | |
self.id = Process.next_id | |
Process.next_id += 1 | |
def __str__(self): | |
return f"{self.name}.{self.id}" | |
def __repr__(self): | |
return f"{type(self).__name__}('{self.name}')" | |
def message(self, m): | |
print(f"{self}: {m}") | |
class Source(Process): | |
def __init__(self, name, length, srcval): | |
super().__init__(name) | |
self.val = srcval | |
self.length = length | |
def connect(self, chan): | |
self.out_chan = chan | |
async def exec(self): | |
for i in range(self.length): | |
self.message(f"sending {self.val}") | |
await self.out_chan.send(self.val) | |
self.message(f"sent {self.val}") | |
self.message("terminated") | |
from curio import CancelledError | |
class Sink(Process): | |
def __init__(self, name): | |
super().__init__(name) | |
def connect(self, chan): | |
self.in_chan = chan | |
async def exec(self): | |
tok_count = 0 | |
try: | |
while True: | |
tok = await self.in_chan.recv() | |
tok_count += 1 | |
self.message(f"received {tok}") | |
except CancelledError: | |
self.message(f"{tok_count} tokens received") | |
class Buffer(Process): | |
def __init__(self, name): | |
super().__init__(name) | |
def connect(self, chan_l, chan_r): | |
self.l_chan = chan_l | |
self.r_chan = chan_r | |
async def exec(self): | |
while True: | |
tok = await self.l_chan.recv() | |
self.message(f"received {tok}") | |
self.message(f"sending {tok}") | |
await self.r_chan.send(tok) | |
from curio import Queue | |
class Channel: | |
def __init__(self, name): | |
self.name = name | |
self.q = Queue(maxsize=1) # Max buffering of 1 | |
async def send(self, val): | |
await self.q.put(val) | |
async def recv(self): | |
tok = await self.q.get() | |
await self.q.task_done() | |
return tok | |
async def close(self): | |
await self.q.join() | |
from curio import run, spawn | |
async def system(): | |
N = 10 # How many buffers in our linear pipeline | |
# Define the channels | |
chan_l = Channel('l') | |
chan_r = [] | |
for i in range(N): | |
chan_r.append(Channel(f'R[{i}]')) | |
# Instantiate the processes | |
src = Source('src1', 10, 1) | |
buf = [Buffer(f'buf[{i}]') for i in range(N)] | |
snk = Sink('snk') | |
# Connect the processes with the channels | |
src.connect(chan_l) | |
buf[0].connect(chan_l, chan_r[0]) | |
for i in range(1, N): | |
buf[i].connect(chan_r[i-1], chan_r[i]) | |
snk.connect(chan_r[N-1]) | |
# Start the processes | |
p_src = await spawn(src.exec()) | |
p_snk = await spawn(snk.exec()) | |
p_buf = [await spawn(buf[i].exec()) for i in range(N)] | |
# Wait for the source to finish sending all its values | |
await p_src.join() | |
# Cancel the remaining processes | |
for i in range(N): | |
await p_buf[i].cancel() | |
await p_snk.cancel() | |
if __name__=='__main__': | |
run(system(), with_monitor=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment