You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This is a monorepo - each directory at root is a separate project with its own .venv
atria/: Core reactive engine with algebraic effects
vscode_ext/: VSCode extension
samples/: Sample applications
Project Stack
Use Python 3.13+ features including type parameters [T] syntax
Use the Python 3.13 typing features, like | for union and [T] for generics
Use Pytest, by activating venv (source .venv/bin/activate) in each subproject, and then running pytest
Development Commands
Activate project venv: cd project_dir && source .venv/bin/activate
Run atria tests after activating venv: cd atria && source .venv/bin/activate && pytest
Run specific atria tests: cd atria && pytest atria/test_file.py::TestClass::test_method -v
Type checking: mypy .
Install dependencies: uv install -e .
Add package: uv install package_name
Remove package: uv pip uninstall package_name
When writing code
You can make related changes focused on a single concept across the entire file. I'll be able to understand.
Basic Code Style Guidelines
Follow PEP 8 conventions with 4-space indentation
Import packages at the top of files
Import order: standard lib → third party → local modules
Use proper type annotations and include py.typed marker
Class naming: CamelCase for classes, snake_case for functions/variables
Document classes and public methods with docstrings
for key-value args, put a space before and after the '=' equal sign
Use American english over British english spellings. 'Color' over 'Colour'. 'Canceled' over 'Cancelled'.
put a space after { and before }, contrary to python style
put a space before and after =, contrary to python style
Basic principles for good engineering
Avoid global mutable state when possible
Use abstract methods when defining interfaces/protocols
The attributes and fields of a class must be annotated with types in the constructor.
Use types information to branch, rather than keying off existence of attributes.
Single Responsibility: Each part of the code now has a clearer, more focused role without redundant paths.
Reduced Indirection: We removed unnecessary intermediate steps between the async operation completing and the generator resuming.
Improved Readability: The flow from async operation to completion is now more linear and easier to follow.
Iterative over Recursive: We chose an iterative approach over a recursive one to prevent potential stack overflow issues, unless recursive is much easier to reason about.
Taste for Good Engineering
Bias towards a pure functional programming style.
FP: Rather than doing a lot of if-based type checks, bias towards using match and case
FP: If there's state management that needs to coordinate beyond about 3 or more pieces of state, "make impossible state impossible" using types.
FP: Use appropriate error handling with typed Result objects
FP: Results should be returned, rather than mutating a parameter that's passed in.
avoid writing code that's too deely nested. Instead, flatten with error checks that abort early and the happy path falls through to the next step.
Project specific guidelines
Use dataclasses for Effect types
Always inherit from base classes (Effect, Ability, Handler)
Refactor the existing Python codebase that implements DBSP circuits so that all runtime Stream objects are moved out of the component dataclasses and into a separate circuit‑state layer.
After the refactor:
Component dataclasses (atomic & composite) describe only pure structure (types, topology, constants, parameters) — no runtime data.
CircuitState (or similarly named type) owns every live Stream produced or consumed while the circuit runs/steps.
The public construction API stays simple, functional, and composable; avoid over‑engineering while keeping the door open for richer builders later (e.g. n‑ary inputs, joining fan‑outs).
A Circuit object wires these together and stores step‑time state such as integrator memory, but individual components also keep streams, causing accidental side‑effects and tight coupling.
Target Architecture
# 1. Pure description layer@dataclassclassMap:
f: Callable@dataclassclassMap2:
f: Callable@dataclassclassDelay:
initial_value: Any# composite wrappers (Integrate, Differentiate, StepResponse, Distinct)# are similarly stripped of runtime fields# 2. Runtime state layer (existing `CircuitState` extended as below)```python@dataclassclassCircuitState:
# Primary **flat** storage keyed by UUID for constant‑time accessstreams: Dict[str, Stream] # StreamID → Stream bufferdelays: Dict[str, Any] # DelayID → last value# Secondary index to preserve hierarchy / enable GC & debuggingindex_by_subcircuit: Dict[str, List[str]] # SubCircuitID → list(StreamID)# … additional bags for component‑specific scratch if needed
Rationale for Flat Storage + Secondary Index
O(1) look‑ups: runtime hot‑path operations (push/pop/peek) never recurse through a tree.
Rewrite‑friendly: compiler passes that splice, inline, or fuse sub‑circuits can leave state tables untouched; only the secondary index needs patching.
Debuggability & GC: the index_by_subcircuit map lets tools dump or garbage‑collect all streams belonging to a nested circuit in one shot.
Low overhead: avoids nested dict objects on the hot path; the index is updated only when circuits are (re)bound or optimized.
Future flexibility: if interactive UIs want a tree, they can materialize it from the flat tables using the index without affecting execution speed.
Why per‑component delay buffers (component_mem) instead of peeking stream history
Constant‑space guarantee – each Delay (or Integrate) holds exactly one value regardless of run length; streams remain append‑only and can drop data once consumed.
Local reasoning & optimization – the need for past data is explicit and operator‑local, so compiler passes can push, fuse, or cancel delays without hidden dependencies on stream history.
Simpler streams – streams stay simple FIFO buffers; they don’t need random‑access or reference‑counted history windows.
Fan‑out isolation – multiple downstream components can each require their own one‑tick memory without forcing the shared stream to retain multiple historical copies.
Easier GC – freeing a sub‑circuit simply deletes its entries from component_mem; no need for global bookkeeping to find which history slices are still referenced.
Optional future GC of history – if you later adopt time‑windowed retention (watermarks), delay buffers remain orthogonal; you can still discard stream events as soon as they’re emitted.
Traversability & Optimization Affordances
(Runtime state: flat primary tables + secondary index — see Runtime layer above)
The pure circuit graph must be easy to traverse (e.g. adjacency lists or parent/child collections) so that an offline optimizer can walk and rewrite it before execution.
Optimizer use‑cases to keep in mind (no implementation yet):
Annihilation: when Integrate and Differentiate meet, replace the pair with a no‑op.
Provide helper iteration utilities (pure functions) — for example, post_order(circuit) or rewrite(circuit, rule_fn) — so later compilation passes can be plugged in without OOP hierarchies.
Ensure component IDs remain stable across rewrites (e.g. keep opaque id field) to link pre‑ and post‑optimization graphs back to CircuitState.
Stream Connection & Nesting
When a pure circuit definition is **bound to concrete Stream objects for execution:
Build an extended CircuitState from the root circuit plus a mapping of external input stream names → Stream objects
Each component instance receives opaque stream IDs generated by a helper such as allocate_stream().
Nested circuits: composing a CompositeCircuit inside another circuit should merge their description graphs while namespacing IDs to prevent collisions.
Current pragmatic solution: prepend the parent component’s id to every child’s id (e.g. f"{parent_id}/{child_id}").
Future improvement: introduce a NameScope monad (or similar FP construct) to encapsulate ID generation without global state.
Cursors / frontiers: store a small Cursor record inside CircuitState for each stream that tracks its logical time frontier. Leave placeholders (TODO: cursor advancement policy).
No sophisticated routing yet: we do not attempt channel reuse or advanced fusion in this refactor; leave TODO comments where richer allocation might live.
The goal is to keep binding a thin, functional pass that respects immutability (build new state objects rather than mutating global tables) and maintains a clear hand‑off point before the optimizer.
Style
Name the definitions without "Def" suffix. Just name them what the name of the component is.
Tasks
Define ID plumbing
Add id: str = field(default_factory=lambda: uuid4().hex) to ComponentBase.
Strip stream fields from all dataclasses.
Extend CircuitState
Holds dicionaries for runtime streams and per‑component private memory.
Provide .step() that executes one logical tick using the structural circuit and mutates only this object.
Update builder functionsmap(f, src) now returns a Map(f) object and registers src / dst stream IDs in state.
Keep function signatures minimal; postpone variadic helpers (e.g. map_n) but leave a TODO comment stub.
Refactor executors
Replace attribute access (component.out.push(val)) with look‑ups: state.streams[out_id].push(val).
Update existing tests
Retain the current test suite and ensure it passes unchanged. You may update how circuits are constructed or run inside the tests to match the new API, but do not change the assertions. If a test fails, fix the refactored implementation—not the expected outcomes.