Skip to content

Instantly share code, notes, and snippets.

@virantha
Created September 27, 2017 15:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save virantha/ce2bca039ce3522e1b4efccf0fe49c01 to your computer and use it in GitHub Desktop.
Save virantha/ce2bca039ce3522e1b4efccf0fe49c01 to your computer and use it in GitHub Desktop.
class Process:
next_id = 0
def __init__(self, name):
self.name = name
self.id = Process.next_id
Process.next_id += 1
def __str__(self):
return f"{self.name}.{self.id}"
def __repr__(self):
return f"{type(self).__name__}('{self.name}')"
def message(self, m):
print(f"{self}: {m}")
class Source(Process):
def __init__(self, name, length, srcval):
super().__init__(name)
self.val = srcval
self.length = length
def connect(self, chan):
self.out_chan = chan
async def exec(self):
for i in range(self.length):
self.message(f"sending {self.val}")
await self.out_chan.send(self.val)
self.message(f"sent {self.val}")
self.message("terminated")
from curio import CancelledError
class Sink(Process):
def __init__(self, name):
super().__init__(name)
def connect(self, chan):
self.in_chan = chan
async def exec(self):
tok_count = 0
try:
while True:
tok = await self.in_chan.recv()
tok_count += 1
self.message(f"received {tok}")
except CancelledError:
self.message(f"{tok_count} tokens received")
class Buffer(Process):
def __init__(self, name):
super().__init__(name)
def connect(self, chan_l, chan_r):
self.l_chan = chan_l
self.r_chan = chan_r
async def exec(self):
while True:
tok = await self.l_chan.recv()
self.message(f"received {tok}")
self.message(f"sending {tok}")
await self.r_chan.send(tok)
from curio import Queue
class Channel:
def __init__(self, name):
self.name = name
self.q = Queue(maxsize=1) # Max buffering of 1
async def send(self, val):
await self.q.put(val)
async def recv(self):
tok = await self.q.get()
await self.q.task_done()
return tok
async def close(self):
await self.q.join()
from curio import run, spawn
async def system():
N = 10 # How many buffers in our linear pipeline
# Define the channels
chan_l = Channel('l')
chan_r = []
for i in range(N):
chan_r.append(Channel(f'R[{i}]'))
# Instantiate the processes
src = Source('src1', 10, 1)
buf = [Buffer(f'buf[{i}]') for i in range(N)]
snk = Sink('snk')
# Connect the processes with the channels
src.connect(chan_l)
buf[0].connect(chan_l, chan_r[0])
for i in range(1, N):
buf[i].connect(chan_r[i-1], chan_r[i])
snk.connect(chan_r[N-1])
# Start the processes
p_src = await spawn(src.exec())
p_snk = await spawn(snk.exec())
p_buf = [await spawn(buf[i].exec()) for i in range(N)]
# Wait for the source to finish sending all its values
await p_src.join()
# Cancel the remaining processes
for i in range(N):
await p_buf[i].cancel()
await p_snk.cancel()
if __name__=='__main__':
run(system(), with_monitor=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment