Skip to content

Instantly share code, notes, and snippets.

@kjagiello
Last active December 7, 2019 17:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kjagiello/57dd8a47ea53feac4922caf2d518ef23 to your computer and use it in GitHub Desktop.
Save kjagiello/57dd8a47ea53feac4922caf2d518ef23 to your computer and use it in GitHub Desktop.
Experimental circuit graph executioner. It allows you to build a circuit consisting of different components (gates) that are interconnected with each other using wires, then simulating its execution.
import time
from collections import defaultdict
class GateError(Exception):
pass
class ExecutionError(Exception):
pass
class GatePin:
pass
class InputPin(GatePin):
pass
class OutputPin(GatePin):
pass
class GateMeta(type):
def __new__(cls, names, bases, attrs):
mapping = {InputPin: "_inputs", OutputPin: "_outputs"}
new_attrs = {v: {} for v in mapping.values()}
for k, v in attrs.items():
if isinstance(v, GatePin) and v.__class__ in mapping:
new_attrs[mapping[v.__class__]][k] = v
return super().__new__(cls, names, bases, {**attrs, **new_attrs})
class Gate(metaclass=GateMeta):
def get_inputs(self):
return self._inputs
def get_outputs(self):
return self.outputs
def _execute(self, **inputs):
if set(self.inputs.keys()) != set(inputs.keys()):
raise GateError("Received invalid inputs.")
outputs = self.execute(**inputs)
if set(self.outputs.keys()) != set(outputs.keys()):
raise GateError("Produced invalid outputs.")
return outputs
def execute(self, **inputs):
"""Takes in an input dict and returns a new output dict."""
raise NotImplementedError
class GateBreakpoint:
def __init__(self, gate, inputs):
self.gate = gate
self.inputs = inputs
def __repr__(self):
inputs = " ".join(f"{k}={v!r}" for k, v in self.inputs.items())
return f"<{self.gate.__class__.__name__} {inputs}>"
def __str__(self):
return repr(self)
class CircuitGraph:
def __init__(self, clock_speed=None):
self.clock_speed = clock_speed
self.breakpoints = []
self.gates = []
self.output_wires = defaultdict(list)
def add_gate(self, gate):
self.gates.append(gate)
def add_breakpoint(self, gate):
self.breakpoints.append(gate)
def add_wire(self, output_gate, output_pin, input_gate, input_pin):
self.output_wires[output_gate].append((output_pin, input_gate, input_pin))
def execute(self, entrypoint, **kwargs):
"""Execute the circuit starting from the entrypoint.
Pass input values to the entrypoint gate as kwargs."""
# TODO: ensure the graph is complete, i.e. all the pins are connected
propagate_inputless = True
state = defaultdict(dict)
state[entrypoint] = kwargs
queue = [entrypoint]
while True:
while queue:
gate = queue.pop()
if self.clock_speed is not None:
time.sleep(1 / self.clock_speed)
current_inputs = state[gate]
expected_inputs = gate.get_inputs()
satisfied = set(expected_inputs.keys()) == set(current_inputs.keys())
if not satisfied:
continue
# Skip if this is an input-less gate that has already propagated
# its outputs.
has_propagated = all(
input_pin in state[input_gate]
for __, input_gate, input_pin in self.output_wires[gate]
)
if has_propagated:
continue
# Check if any breakpoint was set on this gate
if gate in self.breakpoints:
yield GateBreakpoint(gate=gate, inputs=current_inputs)
# Execute the gate
outputs = gate.execute(**current_inputs)
# Follow the output wires for this gate and propagate the output to
# the target gates
for output_pin, input_gate, input_pin in self.output_wires[gate]:
state[input_gate][input_pin] = outputs[output_pin]
queue.append(input_gate)
# Reset the input state for this gate and rerun the graph
state[gate] = {}
propagate_inputless = True
# We seemingly have no work left to do in the circuit. It could be either
# because we got stuck in the execution (probably some wires missing) or we
# have some input-less gates that we need to manually propagate. We attempt
# it once here, rerun the graph and ensure that it actually led to more work
# be done.
if not queue and propagate_inputless:
queue = [gate for gate in self.gates if not gate.get_inputs()]
propagate_inputless = False
# Ending up here means that we got stuck somewhere in the circuit.
# TODO: Add the current state to the error
raise ExecutionError("No progress was made.")
if __name__ == "__main__":
class ConstantValue(Gate):
value = OutputPin()
def __init__(self, value):
self.value = value
def execute(self):
return {"value": self.value}
class ProgramCounter(Gate):
counter_in = InputPin()
counter_out = OutputPin()
def execute(self, counter_in):
return {"counter_out": counter_in}
class AddGate(Gate):
x = InputPin()
y = InputPin()
z = OutputPin()
def execute(self, x, y):
return {"z": x + y}
class Comparator(Gate):
"""Compares a with b and sets c to 1 if true."""
a = InputPin()
b = InputPin()
c = OutputPin()
def execute(self, a, b):
return {"c": int(a == b)}
class Mux(Gate):
a = InputPin()
b = InputPin()
s = InputPin()
c = OutputPin()
def execute(self, a, b, s):
return {"c": b if s == 1 else a}
# Setup the gates
program_counter = ProgramCounter()
pc_increment = ConstantValue(4)
pc_limit = ConstantValue(12)
pc_zero = ConstantValue(0)
comparator = Comparator()
add_gate = AddGate()
mux_gate = Mux()
# Create an empty circuit
graph = CircuitGraph(clock_speed=4)
# Add all the different gates to the circuit
graph.add_gate(program_counter)
graph.add_gate(add_gate)
graph.add_gate(pc_increment)
graph.add_gate(pc_limit)
graph.add_gate(pc_zero)
graph.add_gate(comparator)
graph.add_gate(mux_gate)
# Connect the pins between the gates
graph.add_wire(program_counter, "counter_out", add_gate, "x")
graph.add_wire(pc_increment, "value", add_gate, "y")
# Set up the comparator
graph.add_wire(add_gate, "z", comparator, "a")
graph.add_wire(pc_limit, "value", comparator, "b")
# Set up the mux
graph.add_wire(comparator, "c", mux_gate, "s")
graph.add_wire(add_gate, "z", mux_gate, "a")
graph.add_wire(pc_zero, "value", mux_gate, "b")
# Feed back the new counter from the MUX back to the PC
graph.add_wire(mux_gate, "c", program_counter, "counter_in")
# Mark the PC gate as a breakpoint, so that we can easily track the program
# execution
graph.add_breakpoint(program_counter)
# Start the circuit
executor = graph.execute(entrypoint=program_counter, counter_in=0)
for brk in executor:
print(brk)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment