Skip to content

Instantly share code, notes, and snippets.

@virantha
Last active September 28, 2017 03:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save virantha/93f052ea6f1eef5ddf483487edcf9289 to your computer and use it in GitHub Desktop.
Save virantha/93f052ea6f1eef5ddf483487edcf9289 to your computer and use it in GitHub Desktop.
process_buffer2.py
from curio import Queue, CancelledError
class Port:
def __init__(self):
self.chan = None
class InputPort(Port):
async def recv(self):
tok = await self.chan.recv()
return tok
class OutputPort(Port):
async def send(self, val):
await self.chan.send((val))
def connect(a, b, name=''):
# Connect ports together by instantiating a channel
chan = Channel(name)
# Check to make sure the ports have not been connected previously to other channels!
assert not a.chan, f"Channel {a} has already been connected!"
assert not b.chan, f"Channel {b} has already been connected!"
# Check to make sure the two ports are of opposite type (input/output)
if isinstance(a, InputPort):
assert isinstance(b, OutputPort), f"Channel {a} and {b} are both input ports!"
# Store the ports this channel is connected to
# b ---chan---> a
chan.l = b
chan.r = a
else:
assert isinstance(b, InputPort), f"Channel {a} and {b} are both output ports!"
# Store the ports this channel is connected to
# a ---chan---> b
chan.l = a
chan.r = b
# Now assign the channel to the two ports
a.chan = chan
b.chan = chan
class Process:
next_id = 0
def __init__(self, name):
self.name = name
self.id = Process.next_id
Process.next_id += 1
def __str__(self):
return f"{self.name}.{self.id}"
def __repr__(self):
return f"{type(self).__name__}('{self.name}')"
def message(self, m):
print(f"{self}: {m}")
class Source(Process):
def __init__(self, name, length, srcval):
super().__init__(name)
self.val = srcval
self.length = length
self.R = OutputPort()
async def exec(self):
for i in range(self.length):
self.message(f"sending {self.val}")
await self.R.send(self.val)
self.message(f"sent {self.val}")
self.message("terminated")
class Sink(Process):
def __init__(self, name):
super().__init__(name)
self.L = InputPort()
async def exec(self):
tok_count = 0
try:
while True:
tok = await self.L.recv()
tok_count += 1
self.message(f"received {tok}")
except CancelledError:
self.message(f"{tok_count} tokens received")
class Buffer(Process):
def __init__(self, name):
super().__init__(name)
self.L = InputPort()
self.R = OutputPort()
async def exec(self):
while True:
tok = await self.L.recv()
self.message(f"received {tok}")
self.message(f"sending {tok}")
await self.R.send(tok)
class Channel:
def __init__(self, name):
self.name = name
self.q = Queue(maxsize=1) # Max buffering of 1
async def send(self, val):
await self.q.put(val)
async def recv(self):
tok = await self.q.get()
await self.q.task_done()
return tok
async def close(self):
await self.q.join()
from curio import run, spawn
async def system():
N = 10 # How many buffers in our linear pipeline
# Instantiate the processes
src = Source('src1', 10, 1)
buf = [Buffer(f'buf[{i}]') for i in range(N)]
snk = Sink('snk')
# Connect the processes with the channels
connect(src.R, buf[0].L)
for i in range(1, N):
connect(buf[i-1].R, buf[i].L)
connect(snk.L, buf[N-1].R)
# Start the processes
p_src = await spawn(src.exec())
p_snk = await spawn(snk.exec())
p_buf = [await spawn(buf[i].exec()) for i in range(N)]
# Wait for the source to finish sending all its values
await p_src.join()
# Cancel the remaining processes
for i in range(N):
await p_buf[i].cancel()
await p_snk.cancel()
if __name__=='__main__':
run(system(), with_monitor=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment