Palindrome checker
from curio import Queue, CancelledError | |
class Port: | |
def __init__(self): | |
self.chan = None | |
self.proc = None | |
class InputPort(Port): | |
async def recv(self): | |
tok, timestamp = await self.chan.recv() | |
self.proc._time = max(self.proc._time, timestamp) | |
return tok | |
class OutputPort(Port): | |
async def send(self, val): | |
await self.chan.send((val, self.proc._time)) | |
def connect(a, b, name=''): | |
# Connect ports together by instantiating a channel | |
chan = Channel(name) | |
# Check to make sure the ports have not been connected previously to other channels! | |
assert not a.chan, f"Channel {a} has already been connected!" | |
assert not b.chan, f"Channel {a} has already been connected!" | |
# Check to make sure the two ports are of opposite type (input/output) | |
if isinstance(a, InputPort): | |
assert isinstance(b, OutputPort), f"Channel {a} and {b} are both input ports!" | |
# Store the ports this channel is connected to | |
# b ---chan---> a | |
chan.l = b | |
chan.r = a | |
else: | |
assert isinstance(b, InputPort), f"Channel {a} and {b} are both output ports!" | |
# Store the ports this channel is connected to | |
# a ---chan---> b | |
chan.l = a | |
chan.r = b | |
# Now assign the channel to the two ports | |
a.chan = chan | |
b.chan = chan | |
class Process: | |
next_id = 0 | |
non_producer_processes = {} | |
producer_processes = {} | |
timestep = 100 | |
def __init__(self, name): | |
self.name = name | |
self.id = Process.next_id | |
Process.next_id += 1 | |
# Keep track of all source processes (join on these at the end), and non-source processes (cancel on these at the end) | |
if isinstance(self, Producer): | |
Process.producer_processes[self.id] = self | |
else: | |
Process.non_producer_processes[self.id] = self | |
# Inject the ports from the annotations on this instance | |
for name, val in self.__annotations__.items(): | |
if issubclass(val, Port): | |
port = val() | |
setattr(self, name, port) | |
port.proc = self # Store the process this port is a part of (used for updating local time in the proc on a receive) | |
# Local time | |
self._time = 0 | |
def advance_local_time(self): | |
self._time += self.timestep | |
def __str__(self): | |
return f"{self.name}.{self.id}" | |
def __repr__(self): | |
return f"{type(self).__name__}('{self.name}')" | |
def message(self, m): | |
print( f'T:{self._time}: {self} - { m }') | |
class Producer(Process): | |
# All processes that drive the system (by injecting values in on channels unconditionally) | |
# must subclass this process | |
pass | |
class Source(Producer): | |
R: OutputPort | |
def __init__(self, name, length, srcval): | |
super().__init__(name) | |
self.val = srcval | |
self.length = length | |
async def exec(self): | |
for i in range(self.length): | |
self.message(f"sending {self.val}") | |
self.advance_local_time() | |
await self.R.send(self.val) | |
self.message(f"sent {self.val}") | |
self.message("terminated") | |
class Sink(Process): | |
L: InputPort | |
def __init__(self, name): | |
super().__init__(name) | |
async def exec(self): | |
tok_count = 0 | |
try: | |
while True: | |
tok = await self.L.recv() | |
self.advance_local_time() | |
tok_count += 1 | |
self.message(f"received {tok}") | |
except CancelledError: | |
self.message(f"{tok_count} tokens received") | |
class Buffer(Process): | |
L: InputPort | |
R: OutputPort | |
def __init__(self, name): | |
super().__init__(name) | |
async def exec(self): | |
while True: | |
tok = await self.L.recv() | |
self.message(f"received {tok}") | |
self.advance_local_time() | |
self.message(f"sending {tok}") | |
await self.R.send(tok) | |
class Channel: | |
def __init__(self, name): | |
self.name = name | |
self.q = Queue(maxsize=1) # Max buffering of 1 | |
async def send(self, val): | |
await self.q.put(val) | |
async def recv(self): | |
tok = await self.q.get() | |
await self.q.task_done() | |
return tok | |
async def close(self): | |
await self.q.join() | |
from curio import spawn | |
async def run_all(): | |
source_tasks = [] | |
other_tasks = [] | |
for p in Process.producer_processes.values(): | |
source_tasks.append(await spawn(p.exec())) | |
for p in Process.non_producer_processes.values(): | |
other_tasks.append(await spawn(p.exec())) | |
# Now wait for all sources to end | |
for task in source_tasks: | |
await task.join() | |
for task in other_tasks: | |
await task.cancel() |
from curio import run | |
from chp import * | |
class Pal(Process): | |
C: InputPort | |
Ans: OutputPort | |
Cr: OutputPort | |
Ansr: InputPort | |
async def exec(self): | |
# Do the first char (so we store it) | |
first = await self.C.recv() | |
is_palindrome = True | |
await self.Ans.send(is_palindrome) | |
while True: | |
x = await self.C.recv() | |
is_palindrome = (x==first and is_palindrome) | |
self.advance_local_time() | |
await self.Ans.send(is_palindrome) | |
await self.Cr.send(x) | |
is_palindrome = await self.Ansr.recv() | |
class Env(Producer): | |
R: OutputPort | |
A: InputPort | |
def __init__(self, name, string): | |
super().__init__(name) | |
self.string = string | |
async def exec(self): | |
for i in range(len(self.string)): | |
await self.R.send(self.string[i]) | |
is_palindrome = await self.A.recv() | |
self.message(f'{self.string[:i+1]} - {is_palindrome}') | |
self.advance_local_time() | |
async def system(): | |
string = 'amanaplanacatahamayakayamahatacanalpanama' | |
env = Env('env', string) | |
pals = [] | |
for i in range(len(string)): | |
pals.append(Pal(f'pal[{i}]')) | |
connect(env.R, pals[0].C) | |
connect(env.A, pals[0].Ans) | |
for i in range(len(string)-1): | |
connect(pals[i].Cr, pals[i+1].C) | |
connect(pals[i+1].Ans, pals[i].Ansr) | |
await run_all() | |
run(system(), with_monitor=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment