Skip to content

Instantly share code, notes, and snippets.

@virantha
Last active October 1, 2017 19:32
Show Gist options
  • Save virantha/4c9b770244de999117f42b6002f08178 to your computer and use it in GitHub Desktop.
Save virantha/4c9b770244de999117f42b6002f08178 to your computer and use it in GitHub Desktop.
Palindrome checker
from curio import Queue, CancelledError
class Port:
def __init__(self):
self.chan = None
self.proc = None
class InputPort(Port):
async def recv(self):
tok, timestamp = await self.chan.recv()
self.proc._time = max(self.proc._time, timestamp)
return tok
class OutputPort(Port):
async def send(self, val):
await self.chan.send((val, self.proc._time))
def connect(a, b, name=''):
# Connect ports together by instantiating a channel
chan = Channel(name)
# Check to make sure the ports have not been connected previously to other channels!
assert not a.chan, f"Channel {a} has already been connected!"
assert not b.chan, f"Channel {a} has already been connected!"
# Check to make sure the two ports are of opposite type (input/output)
if isinstance(a, InputPort):
assert isinstance(b, OutputPort), f"Channel {a} and {b} are both input ports!"
# Store the ports this channel is connected to
# b ---chan---> a
chan.l = b
chan.r = a
else:
assert isinstance(b, InputPort), f"Channel {a} and {b} are both output ports!"
# Store the ports this channel is connected to
# a ---chan---> b
chan.l = a
chan.r = b
# Now assign the channel to the two ports
a.chan = chan
b.chan = chan
class Process:
next_id = 0
non_producer_processes = {}
producer_processes = {}
timestep = 100
def __init__(self, name):
self.name = name
self.id = Process.next_id
Process.next_id += 1
# Keep track of all source processes (join on these at the end), and non-source processes (cancel on these at the end)
if isinstance(self, Producer):
Process.producer_processes[self.id] = self
else:
Process.non_producer_processes[self.id] = self
# Inject the ports from the annotations on this instance
for name, val in self.__annotations__.items():
if issubclass(val, Port):
port = val()
setattr(self, name, port)
port.proc = self # Store the process this port is a part of (used for updating local time in the proc on a receive)
# Local time
self._time = 0
def advance_local_time(self):
self._time += self.timestep
def __str__(self):
return f"{self.name}.{self.id}"
def __repr__(self):
return f"{type(self).__name__}('{self.name}')"
def message(self, m):
print( f'T:{self._time}: {self} - { m }')
class Producer(Process):
# All processes that drive the system (by injecting values in on channels unconditionally)
# must subclass this process
pass
class Source(Producer):
R: OutputPort
def __init__(self, name, length, srcval):
super().__init__(name)
self.val = srcval
self.length = length
async def exec(self):
for i in range(self.length):
self.message(f"sending {self.val}")
self.advance_local_time()
await self.R.send(self.val)
self.message(f"sent {self.val}")
self.message("terminated")
class Sink(Process):
L: InputPort
def __init__(self, name):
super().__init__(name)
async def exec(self):
tok_count = 0
try:
while True:
tok = await self.L.recv()
self.advance_local_time()
tok_count += 1
self.message(f"received {tok}")
except CancelledError:
self.message(f"{tok_count} tokens received")
class Buffer(Process):
L: InputPort
R: OutputPort
def __init__(self, name):
super().__init__(name)
async def exec(self):
while True:
tok = await self.L.recv()
self.message(f"received {tok}")
self.advance_local_time()
self.message(f"sending {tok}")
await self.R.send(tok)
class Channel:
def __init__(self, name):
self.name = name
self.q = Queue(maxsize=1) # Max buffering of 1
async def send(self, val):
await self.q.put(val)
async def recv(self):
tok = await self.q.get()
await self.q.task_done()
return tok
async def close(self):
await self.q.join()
from curio import spawn
async def run_all():
source_tasks = []
other_tasks = []
for p in Process.producer_processes.values():
source_tasks.append(await spawn(p.exec()))
for p in Process.non_producer_processes.values():
other_tasks.append(await spawn(p.exec()))
# Now wait for all sources to end
for task in source_tasks:
await task.join()
for task in other_tasks:
await task.cancel()
from curio import run
from chp import *
class Pal(Process):
C: InputPort
Ans: OutputPort
Cr: OutputPort
Ansr: InputPort
async def exec(self):
# Do the first char (so we store it)
first = await self.C.recv()
is_palindrome = True
await self.Ans.send(is_palindrome)
while True:
x = await self.C.recv()
is_palindrome = (x==first and is_palindrome)
self.advance_local_time()
await self.Ans.send(is_palindrome)
await self.Cr.send(x)
is_palindrome = await self.Ansr.recv()
class Env(Producer):
R: OutputPort
A: InputPort
def __init__(self, name, string):
super().__init__(name)
self.string = string
async def exec(self):
for i in range(len(self.string)):
await self.R.send(self.string[i])
is_palindrome = await self.A.recv()
self.message(f'{self.string[:i+1]} - {is_palindrome}')
self.advance_local_time()
async def system():
string = 'amanaplanacatahamayakayamahatacanalpanama'
env = Env('env', string)
pals = []
for i in range(len(string)):
pals.append(Pal(f'pal[{i}]'))
connect(env.R, pals[0].C)
connect(env.A, pals[0].Ans)
for i in range(len(string)-1):
connect(pals[i].Cr, pals[i+1].C)
connect(pals[i+1].Ans, pals[i].Ansr)
await run_all()
run(system(), with_monitor=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment