Created
October 4, 2019 19:48
-
-
Save scolby33/e09624839a2b422312113d22b4e56da5 to your computer and use it in GitHub Desktop.
Creating Namespaced/Gapless Sequences Using Event Listeners
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Non-working example of namespaced sequences using event listeners. | |
Submitted as example code to the SQLAlchemy mailing list. | |
""" | |
import sqlite3 | |
import sys | |
from typing import Collection, List, Optional, Union | |
from sqlalchemy import ( | |
create_engine, | |
event, | |
Column, | |
Integer, | |
ForeignKey, | |
UniqueConstraint, | |
) | |
from sqlalchemy.engine import Engine | |
import sqlalchemy.ext.declarative.api | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import relationship, sessionmaker, Session | |
from sqlalchemy.orm.session import UOWTransaction | |
from sqlalchemy.pool import Pool | |
import sqlalchemy.pool.base | |
Base: sqlalchemy.ext.declarative.api.DeclarativeMeta = declarative_base() | |
class Parent(Base): | |
__tablename__ = "parent" | |
parent_id = Column(Integer, primary_key=True) | |
class Child(Base): | |
__tablename__ = "child" | |
child_id = Column(Integer, primary_key=True) | |
parent_id = Column( | |
Integer, | |
ForeignKey("parent.parent_id", ondelete="cascade"), | |
nullable=False, | |
) | |
child_number = Column(Integer, nullable=False) | |
parent = relationship("Parent") | |
__table_args__ = (UniqueConstraint("parent_id", "child_number"),) | |
class ChildNumberSequence(Base): | |
"""Table for keeping track of the next child number per parent.""" | |
__tablename__ = "child_number_sequence" | |
parent_id = Column( | |
Integer, | |
ForeignKey("parent.parent_id", ondelete="cascade"), | |
primary_key=True, | |
) | |
last_child_number = Column(Integer, nullable=False, server_default="0") | |
parent = relationship("Parent") | |
def insert_child_number_sequence( | |
session: Session, | |
_flush_context: UOWTransaction, | |
_instances: Optional[Collection[Base]], | |
) -> None: | |
"""Add a row to the child_number_sequence table for each new row in the parent table.""" | |
for instance in session.new: | |
if not isinstance(instance, Parent): | |
continue | |
child_number_sequence = ChildNumberSequence(parent=instance) | |
session.add(child_number_sequence) | |
def set_child_number( | |
session: Session, | |
_flush_context: UOWTransaction, | |
_instances: Optional[Collection[Base]], | |
) -> None: | |
"""Set the child_number for each new child based on the last_child_number in the child_number_sequence table.""" | |
for instance in session.new: | |
if not isinstance(instance, Child): | |
continue | |
child_number_sequence: ChildNumberSequence = session.query( | |
ChildNumberSequence | |
).filter_by(parent=instance.parent).one() | |
child_number_sequence.last_child_number = ( | |
ChildNumberSequence.last_child_number + 1 | |
) | |
instance.child_number = child_number_sequence.last_child_number | |
session.add(child_number_sequence) | |
@event.listens_for(Engine, "connect") | |
def set_sqlite_pragma( | |
dbapi_connection, | |
_connection_record: sqlalchemy.pool.base._ConnectionRecord, | |
) -> None: | |
"""Enable foreign_keys in SQLite.""" | |
if isinstance(dbapi_connection, sqlite3.Connection): | |
cursor = dbapi_connection.cursor() | |
cursor.execute("PRAGMA foreign_keys=ON") | |
cursor.close() | |
def setup_session_listeners(session: Session) -> None: | |
"""Set up event listeners for session. Call with your session/sessionmaker before using it.""" | |
event.listen(session, "before_flush", insert_child_number_sequence) | |
event.listen(session, "before_flush", set_child_number) | |
def main(argv: List[str]): | |
if len(argv) >= 2: | |
url = argv[1] | |
else: | |
url = "sqlite:///:memory:" | |
engine = create_engine(url, echo=True) | |
Base.metadata.create_all(engine) | |
session = sessionmaker(bind=engine) | |
setup_session_listeners(session) | |
session = session() | |
p1 = Parent() | |
session.add(p1) | |
session.commit() | |
child_number_sequence_1 = session.query(ChildNumberSequence).get( | |
p1.parent_id | |
) | |
assert child_number_sequence_1.parent_id == p1.parent_id | |
assert child_number_sequence_1.last_child_number == 0 | |
c1_1 = Child(parent=p1) | |
session.add(c1_1) | |
session.commit() | |
assert c1_1.child_number == 1 | |
assert child_number_sequence_1.last_child_number == 1 | |
c1_2 = Child(parent=p1) | |
session.add(c1_2) | |
session.commit() | |
assert c1_2.child_number == 2 | |
assert child_number_sequence_1.last_child_number == 2 | |
p2 = Parent() | |
session.add(p2) | |
session.commit() | |
child_number_sequence_2 = session.query(ChildNumberSequence).get( | |
p2.parent_id | |
) | |
assert child_number_sequence_2.parent_id == p2.parent_id | |
assert child_number_sequence_2.last_child_number == 0 | |
c2_1 = Child(parent=p2) | |
session.add(c2_1) | |
session.commit() | |
assert c2_1.child_number == 1 | |
assert child_number_sequence_2.last_child_number == 1 | |
if __name__ == "__main__": | |
sys.exit(main(sys.argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment