Skip to content

Instantly share code, notes, and snippets.

@kkirsche
Last active December 11, 2023 16:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kkirsche/5bd88fbdbc1b6878e2045fdb0473df37 to your computer and use it in GitHub Desktop.
Save kkirsche/5bd88fbdbc1b6878e2045fdb0473df37 to your computer and use it in GitHub Desktop.
Versioning Troubleshooting
#!/usr/bin/env python
# BEGIN: history_meta.py
# https://docs.sqlalchemy.org/en/14/_modules/examples/versioned_history/history_meta.html
"""Versioned mixin class and other utilities."""
from __future__ import annotations
import datetime
from collections.abc import Callable, Generator
from typing import TypeAlias, Any
from sqlalchemy import Column, create_engine
from sqlalchemy.testing import ne_
from sqlalchemy import ForeignKey
from sqlalchemy import Column, Integer, String
from sqlalchemy import DateTime
from sqlalchemy import event
from sqlalchemy import ForeignKeyConstraint
from sqlalchemy import Integer
from sqlalchemy import Table
from sqlalchemy import util
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import attributes, relationship
from sqlalchemy.testing.entities import ComparableEntity
from sqlalchemy.orm import mapper, Mapper, Session, sessionmaker
from sqlalchemy.types import TypeEngine
from sqlalchemy.orm import object_mapper, declarative_base
from sqlalchemy.orm.exc import UnmappedColumnError
from sqlalchemy.orm.relationships import RelationshipProperty
_Column: TypeAlias = Column[TypeEngine[Any]]
def col_references_table(col: _Column, table: Table) -> bool:
for fk in col.foreign_keys:
if fk.references(table):
return True
return False
def _is_versioning_col(col: _Column) -> bool:
return "version_meta" in col.info
def _history_mapper(local_mapper: Mapper) -> None:
cls = local_mapper.class_
# set the "active_history" flag
# on on column-mapped attributes so that the old version
# of the info is always loaded (currently sets it on all attributes)
for prop in local_mapper.iterate_properties:
getattr(local_mapper.class_, prop.key).impl.active_history = True
super_mapper = local_mapper.inherits
super_history_mapper = getattr(cls, "__history_mapper__", None)
polymorphic_on = None
super_fks = []
def _col_copy(col: _Column) -> _Column:
orig = col
col = col.copy()
orig.info["history_copy"] = col
col.unique = False
col.default = col.server_default = None
col.autoincrement = False
return col
properties = util.OrderedDict()
if not super_mapper or local_mapper.local_table is not super_mapper.local_table:
cols = []
version_meta = {"version_meta": True} # add column.info to identify
# columns specific to versioning
for column in local_mapper.local_table.c:
if _is_versioning_col(column):
continue
col = _col_copy(column)
if super_mapper and col_references_table(column, super_mapper.local_table):
super_fks.append(
(
col.key,
list(super_history_mapper.local_table.primary_key)[0],
)
)
cols.append(col)
if column is local_mapper.polymorphic_on:
polymorphic_on = col
orig_prop = local_mapper.get_property_by_column(column)
# carry over column re-mappings
if len(orig_prop.columns) > 1 or orig_prop.columns[0].key != orig_prop.key:
properties[orig_prop.key] = tuple(
col.info["history_copy"] for col in orig_prop.columns
)
if super_mapper:
super_fks.append(("version", super_history_mapper.local_table.c.version))
# "version" stores the integer version id. This column is
# required.
cols.append(
Column(
"version",
Integer,
primary_key=True,
autoincrement=False,
info=version_meta,
)
)
# "changed" column stores the UTC timestamp of when the
# history row was created.
# This column is optional and can be omitted.
cols.append(
Column(
"changed",
DateTime,
default=lambda: datetime.datetime.now(datetime.timezone.utc),
info=version_meta,
)
)
if super_fks:
cols.append(ForeignKeyConstraint(*zip(*super_fks)))
table = Table(
local_mapper.local_table.name + "_history",
local_mapper.local_table.metadata,
*cols,
schema=local_mapper.local_table.schema,
)
else:
# single table inheritance. take any additional columns that may have
# been added and add them to the history table.
for column in local_mapper.local_table.c:
if column.key not in super_history_mapper.local_table.c:
col = _col_copy(column)
super_history_mapper.local_table.append_column(col)
table = None
if super_history_mapper:
bases = (super_history_mapper.class_,)
if table is not None:
properties["changed"] = (table.c.changed,) + tuple(
super_history_mapper.attrs.changed.columns
)
else:
bases = local_mapper.base_mapper.class_.__bases__
versioned_cls = type.__new__(type, "%sHistory" % cls.__name__, bases, {})
m = mapper(
versioned_cls,
table,
inherits=super_history_mapper,
polymorphic_on=polymorphic_on,
polymorphic_identity=local_mapper.polymorphic_identity,
properties=properties,
)
cls.__history_mapper__ = m
if not super_history_mapper:
local_mapper.local_table.append_column(
Column("version", Integer, default=1, nullable=False),
replace_existing=True,
)
local_mapper.add_property("version", local_mapper.local_table.c.version)
if cls.use_mapper_versioning:
local_mapper.version_id_col = local_mapper.local_table.c.version
class Versioned:
use_mapper_versioning = True # changed by kkirsche to True
"""if True, also assign the version column to be tracked by the mapper"""
@declared_attr
def __mapper_cls__(cls) -> Callable[..., Mapper]:
def map_(cls, *arg, **kw) -> Mapper:
mp = mapper(cls, *arg, **kw)
_history_mapper(mp)
return mp
return map_
__table_args__ = {"sqlite_autoincrement": True}
"""Use sqlite_autoincrement, to ensure unique integer values
are used for new rows even for rows that have been deleted."""
def versioned_objects(iter_: Any) -> Generator[Any, None, None]:
for obj in iter_:
if hasattr(obj, "__history_mapper__"):
yield obj
def create_version(obj: Any, session: Session, deleted: bool = False) -> None:
obj_mapper = object_mapper(obj)
history_mapper = obj.__history_mapper__
history_cls = history_mapper.class_
obj_state = attributes.instance_state(obj)
attr = {}
obj_changed = False
for om, hm in zip(obj_mapper.iterate_to_root(), history_mapper.iterate_to_root()):
if hm.single:
continue
for hist_col in hm.local_table.c:
if _is_versioning_col(hist_col):
continue
obj_col = om.local_table.c[hist_col.key]
# get the value of the
# attribute based on the MapperProperty related to the
# mapped column. this will allow usage of MapperProperties
# that have a different keyname than that of the mapped column.
try:
prop = obj_mapper.get_property_by_column(obj_col)
except UnmappedColumnError:
# in the case of single table inheritance, there may be
# columns on the mapped table intended for the subclass only.
# the "unmapped" status of the subclass column on the
# base class is a feature of the declarative module.
continue
# expired object attributes and also deferred cols might not
# be in the dict. force it to load no matter what by
# using getattr().
if prop.key not in obj_state.dict:
getattr(obj, prop.key)
a, u, d = attributes.get_history(obj, prop.key)
if d:
attr[prop.key] = d[0]
obj_changed = True
elif u:
attr[prop.key] = u[0]
elif a:
# if the attribute had no value.
attr[prop.key] = a[0]
obj_changed = True
if not obj_changed:
# not changed, but we have relationships. OK
# check those too
for prop in obj_mapper.iterate_properties:
if (
isinstance(prop, RelationshipProperty)
and attributes.get_history(
obj, prop.key, passive=attributes.PASSIVE_NO_INITIALIZE
).has_changes()
):
for p in prop.local_columns:
if p.foreign_keys:
obj_changed = True
break
if obj_changed is True:
break
if not obj_changed and not deleted:
return
attr["version"] = obj.version
hist = history_cls()
for key, value in attr.items():
setattr(hist, key, value)
session.add(hist)
obj.version += 1
def versioned_session(session: Session) -> None:
@event.listens_for(session, "before_flush")
def before_flush(session: Session, flush_context: Any, instances: Any) -> None:
for obj in versioned_objects(session.dirty):
create_version(obj, session)
for obj in versioned_objects(session.deleted):
create_version(obj, session, deleted=True)
# END: history_meta.py
# BEGIN: Kevin's code
Base = declarative_base()
class RequestsExceptions(Base, ComparableEntity):
__tablename__ = "requests_exceptions"
id = Column(Integer, primary_key=True)
request_id = Column(ForeignKey("requests.id"))
exception_id = Column(ForeignKey("exceptions.id"))
class SecurityException(Versioned, Base, ComparableEntity):
__tablename__ = "exceptions"
id = Column(Integer, primary_key=True)
name = Column(String(50))
requests = relationship(
"Request", back_populates="exceptions", secondary="requests_exceptions"
)
class Request(Versioned, Base, ComparableEntity):
__tablename__ = "requests"
id = Column(Integer, primary_key=True)
name = Column(String(50))
exceptions = relationship(
"SecurityException",
back_populates="requests",
secondary="requests_exceptions",
)
def main() -> None:
engine = create_engine("sqlite:///:memory:", future=True)
Base.metadata.create_all(engine)
Session = sessionmaker(engine)
with Session as sess:
req = Request(name="Example Request")
exc = SecurityException(name="Example Exception")
req.exceptions.append(exc)
sess.add(req)
sess.add(exc)
sess.commit()
sess.delete(req)
sess.commit()
req2 = Request(name="sc2")
sess.add(req2)
sess.commit()
RequestHistory = Request.__history_mapper__.class_
# only one entry should exist in the history table; one()
# ensures that
reqdeleted = sess.query(RequestHistory).one()
# If sc2 has the same id that deleted sc1 had,
# it will fail when modified or deleted
# because of the violation of the uniqueness of the primary key on
# sometable_history
ne_(req2.id, reqdeleted.id)
if __name__ == "__main__":
main()
SQLAlchemy==1.4.50
typing_extensions==4.9.0
❯ python main.py
Traceback (most recent call last):
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/clsregistry.py", line 393, in _resolve_name
rval = d[token]
~^^^^^^^
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/util/_collections.py", line 746, in __missing__
self[key] = val = self.creator(key)
^^^^^^^^^^^^^^^^^
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/clsregistry.py", line 372, in _access_cls
return self.fallback[key]
~~~~~~~~~~~~~^^^^^
KeyError: 'Request'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/kkirsche/git/python/test/main.py", line 304, in <module>
class SecurityException(Versioned, Base, ComparableEntity):
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/decl_api.py", line 76, in __init__
_as_declarative(reg, cls, dict_)
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/decl_base.py", line 126, in _as_declarative
return _MapperConfig.setup_mapping(registry, cls, dict_, None, {})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/decl_base.py", line 183, in setup_mapping
return cfg_cls(registry, cls_, dict_, table, mapper_kw)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/decl_base.py", line 335, in __init__
self._early_mapping(mapper_kw)
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/decl_base.py", line 215, in _early_mapping
self.map(mapper_kw)
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/decl_base.py", line 1047, in map
mapper_cls(self.cls, self.local_table, **self.mapper_args),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/V832272/git/python/test/main.py", line 185, in map_
_history_mapper(mp)
File "/Users/kkirsche/git/python/test/main.py", line 50, in _history_mapper
for prop in local_mapper.iterate_properties:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/mapper.py", line 2072, in iterate_properties
self._check_configure()
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/mapper.py", line 1941, in _check_configure
_configure_registries({self.registry}, cascade=True)
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/mapper.py", line 3527, in _configure_registries
_do_configure_registries(registries, cascade)
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/mapper.py", line 3566, in _do_configure_registries
mapper._post_configure_properties()
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/mapper.py", line 1958, in _post_configure_properties
prop.init()
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/interfaces.py", line 231, in init
self.do_init()
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/relationships.py", line 2150, in do_init
self._process_dependent_arguments()
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/relationships.py", line 2245, in _process_dependent_arguments
self.target = self.entity.persist_selectable
^^^^^^^^^^^
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 1113, in __get__
obj.__dict__[self.__name__] = result = self.fget(obj)
^^^^^^^^^^^^^^
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/relationships.py", line 2112, in entity
argument = self._clsregistry_resolve_name(self.argument)()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/clsregistry.py", line 397, in _resolve_name
self._raise_for_name(name, err)
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/orm/clsregistry.py", line 375, in _raise_for_name
util.raise_(
File "/Users/kkirsche/git/python/test/venv/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
sqlalchemy.exc.InvalidRequestError: When initializing mapper mapped class SecurityException->exceptions, expression 'Request' failed to locate a name ('Request'). If this is a class name, consider adding this relationship() to the <class '__main__.SecurityException'> class after both dependent classes have been defined.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment