Skip to content

Instantly share code, notes, and snippets.

@scolby33
Created October 4, 2019 19:48
Show Gist options
  • Save scolby33/e09624839a2b422312113d22b4e56da5 to your computer and use it in GitHub Desktop.
Save scolby33/e09624839a2b422312113d22b4e56da5 to your computer and use it in GitHub Desktop.
Creating Namespaced/Gapless Sequences Using Event Listeners
"""
Non-working example of namespaced sequences using event listeners.
Submitted as example code to the SQLAlchemy mailing list.
"""
import sqlite3
import sys
from typing import Collection, List, Optional, Union
from sqlalchemy import (
create_engine,
event,
Column,
Integer,
ForeignKey,
UniqueConstraint,
)
from sqlalchemy.engine import Engine
import sqlalchemy.ext.declarative.api
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker, Session
from sqlalchemy.orm.session import UOWTransaction
from sqlalchemy.pool import Pool
import sqlalchemy.pool.base
Base: sqlalchemy.ext.declarative.api.DeclarativeMeta = declarative_base()
class Parent(Base):
__tablename__ = "parent"
parent_id = Column(Integer, primary_key=True)
class Child(Base):
__tablename__ = "child"
child_id = Column(Integer, primary_key=True)
parent_id = Column(
Integer,
ForeignKey("parent.parent_id", ondelete="cascade"),
nullable=False,
)
child_number = Column(Integer, nullable=False)
parent = relationship("Parent")
__table_args__ = (UniqueConstraint("parent_id", "child_number"),)
class ChildNumberSequence(Base):
"""Table for keeping track of the next child number per parent."""
__tablename__ = "child_number_sequence"
parent_id = Column(
Integer,
ForeignKey("parent.parent_id", ondelete="cascade"),
primary_key=True,
)
last_child_number = Column(Integer, nullable=False, server_default="0")
parent = relationship("Parent")
def insert_child_number_sequence(
session: Session,
_flush_context: UOWTransaction,
_instances: Optional[Collection[Base]],
) -> None:
"""Add a row to the child_number_sequence table for each new row in the parent table."""
for instance in session.new:
if not isinstance(instance, Parent):
continue
child_number_sequence = ChildNumberSequence(parent=instance)
session.add(child_number_sequence)
def set_child_number(
session: Session,
_flush_context: UOWTransaction,
_instances: Optional[Collection[Base]],
) -> None:
"""Set the child_number for each new child based on the last_child_number in the child_number_sequence table."""
for instance in session.new:
if not isinstance(instance, Child):
continue
child_number_sequence: ChildNumberSequence = session.query(
ChildNumberSequence
).filter_by(parent=instance.parent).one()
child_number_sequence.last_child_number = (
ChildNumberSequence.last_child_number + 1
)
instance.child_number = child_number_sequence.last_child_number
session.add(child_number_sequence)
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(
dbapi_connection,
_connection_record: sqlalchemy.pool.base._ConnectionRecord,
) -> None:
"""Enable foreign_keys in SQLite."""
if isinstance(dbapi_connection, sqlite3.Connection):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
def setup_session_listeners(session: Session) -> None:
"""Set up event listeners for session. Call with your session/sessionmaker before using it."""
event.listen(session, "before_flush", insert_child_number_sequence)
event.listen(session, "before_flush", set_child_number)
def main(argv: List[str]):
if len(argv) >= 2:
url = argv[1]
else:
url = "sqlite:///:memory:"
engine = create_engine(url, echo=True)
Base.metadata.create_all(engine)
session = sessionmaker(bind=engine)
setup_session_listeners(session)
session = session()
p1 = Parent()
session.add(p1)
session.commit()
child_number_sequence_1 = session.query(ChildNumberSequence).get(
p1.parent_id
)
assert child_number_sequence_1.parent_id == p1.parent_id
assert child_number_sequence_1.last_child_number == 0
c1_1 = Child(parent=p1)
session.add(c1_1)
session.commit()
assert c1_1.child_number == 1
assert child_number_sequence_1.last_child_number == 1
c1_2 = Child(parent=p1)
session.add(c1_2)
session.commit()
assert c1_2.child_number == 2
assert child_number_sequence_1.last_child_number == 2
p2 = Parent()
session.add(p2)
session.commit()
child_number_sequence_2 = session.query(ChildNumberSequence).get(
p2.parent_id
)
assert child_number_sequence_2.parent_id == p2.parent_id
assert child_number_sequence_2.last_child_number == 0
c2_1 = Child(parent=p2)
session.add(c2_1)
session.commit()
assert c2_1.child_number == 1
assert child_number_sequence_2.last_child_number == 1
if __name__ == "__main__":
sys.exit(main(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment