Skip to content

Instantly share code, notes, and snippets.

@wonderbeyond
Last active June 21, 2023 02:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wonderbeyond/246d767029227e138cbf20a756f6983b to your computer and use it in GitHub Desktop.
Save wonderbeyond/246d767029227e138cbf20a756f6983b to your computer and use it in GitHub Desktop.
[SQLAlchemy] A Task-Step model that can describe steps' upstream and downstream relationships.
"""
A Task-Step model that can describe steps' upstream and downstream relationships.
Also refer to https://docs.sqlalchemy.org/en/20/orm/join_conditions.html#self-referential-many-to-many-relationship
"""
from typing import Annotated, List
import sqlalchemy as sa
from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship, backref
from sqlalchemy.orm import MappedAsDataclass, DeclarativeBase
from sqlalchemy.ext.asyncio import AsyncAttrs
intpk = Annotated[int, mapped_column(primary_key=True)]
str128 = Annotated[str, mapped_column(sa.String(128))]
class Base(AsyncAttrs, MappedAsDataclass, DeclarativeBase, init=False, repr=False):
pass
class Task(Base):
__tablename__ = "task"
id: Mapped[intpk]
name: Mapped[str128]
steps: Mapped[List["Step"]] = relationship(back_populates="task", cascade="all, delete-orphan")
# A secondary table to represent many-to-many (upstream/downstream) relationship between steps.
step_dep = sa.Table(
"step_dep",
Base.metadata,
sa.Column("up_id", sa.BigInteger, ForeignKey("step.id"), primary_key=True),
sa.Column("down_id", sa.BigInteger, ForeignKey("step.id"), primary_key=True),
)
class Step(Base):
__tablename__ = "step"
id: Mapped[intpk]
task_id: Mapped[int] = mapped_column(ForeignKey("task.id"))
name: Mapped[str128]
task: Mapped["Task"] = relationship(back_populates="steps")
ups: Mapped[List["Step"]] = relationship(
"Step",
secondary="step_dep",
primaryjoin="Step.id == step_dep.c.down_id",
secondaryjoin="Step.id == step_dep.c.up_id",
backref=backref("downs", overlaps="up,down"),
overlaps="up,down",
)
async def test_step_dep():
async with adb.Session() as s:
s.add_all([
task := Task(name="task-2"),
])
async def _refresh():
await s.refresh(step1)
await s.refresh(step2)
await step1.awaitable_attrs.downs
await step2.awaitable_attrs.ups
task.steps.append(step1 := Step(name="step-1"))
task.steps.append(step2 := Step(name="step-2", ups=[step1]))
await s.commit()
await _refresh()
assert step1.downs == [step2]
assert step2.ups == [step1]
step1.downs = []
assert not step2.ups # `step2.ups` should be updated automatically
await s.commit()
await _refresh()
assert not step1.downs
assert not step2.ups
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment