Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Perform database sync sanity check to SQLAlchemy models on application startup
import logging
from sqlalchemy import inspect
from sqlalchemy.ext.declarative.clsregistry import _ModuleMarker
from sqlalchemy.orm import RelationshipProperty
logger = logging.getLogger(__name__)
def is_sane_database(Base, session):
"""Check whether the current database matches the models declared in model base.
Currently we check that all tables exist with all columns. What is not checked
* Column types are not verified
* Relationships are not verified at all (TODO)
:param Base: Declarative Base for SQLAlchemy models to check
:param session: SQLAlchemy session bound to an engine
:return: True if all declared models have corresponding tables and columns.
"""
engine = session.get_bind()
iengine = inspect(engine)
errors = False
tables = iengine.get_table_names()
# Go through all SQLAlchemy models
for name, klass in Base._decl_class_registry.items():
if isinstance(klass, _ModuleMarker):
# Not a model
continue
table = klass.__tablename__
if table in tables:
# Check all columns are found
# Looks like [{'default': "nextval('sanity_check_test_id_seq'::regclass)", 'autoincrement': True, 'nullable': False, 'type': INTEGER(), 'name': 'id'}]
columns = [c["name"] for c in iengine.get_columns(table)]
mapper = inspect(klass)
for column_prop in mapper.attrs:
if isinstance(column_prop, RelationshipProperty):
# TODO: Add sanity checks for relations
pass
else:
for column in column_prop.columns:
# Assume normal flat column
if not column.key in columns:
logger.error("Model %s declares column %s which does not exist in database %s", klass, column.key, engine)
errors = True
else:
logger.error("Model %s declares table %s which does not exist in database %s", klass, table, engine)
errors = True
return not errors
"""Tests for checking database sanity checks functions correctly."""
from pyramid_web20.system.model.sanitycheck import is_sane_database
from sqlalchemy import engine_from_config, Column, Integer, String
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import ForeignKey
def setup_module(self):
# Quiet log output for the tests
import logging
from pyramid_web20.system.model.sanitycheck import logger
#logger.setLevel(logging.FATAL)
def gen_test_model():
Base = declarative_base()
class SaneTestModel(Base):
"""A sample SQLAlchemy model to demostrate db conflicts. """
__tablename__ = "sanity_check_test"
#: Running counter used in foreign key references
id = Column(Integer, primary_key=True)
return Base, SaneTestModel
def gen_relation_models():
Base = declarative_base()
class RelationTestModel(Base):
__tablename__ = "sanity_check_test_2"
id = Column(Integer, primary_key=True)
class RelationTestModel2(Base):
__tablename__ = "sanity_check_test_3"
id = Column(Integer, primary_key=True)
test_relationship_id = Column(ForeignKey("sanity_check_test_2.id"))
test_relationship = relationship(RelationTestModel, primaryjoin=test_relationship_id == RelationTestModel.id)
return Base, RelationTestModel, RelationTestModel2
def gen_declarative():
Base = declarative_base()
class DeclarativeTestModel(Base):
__tablename__ = "sanity_check_test_4"
id = Column(Integer, primary_key=True)
@declared_attr
def _password(self):
return Column('password', String(256), nullable=False)
@hybrid_property
def password(self):
return self._password
return Base, DeclarativeTestModel
def test_sanity_pass(ini_settings, dbsession):
"""See database sanity check completes when tables and columns are created."""
engine = engine_from_config(ini_settings, 'sqlalchemy.')
conn = engine.connect()
trans = conn.begin()
Base, SaneTestModel = gen_test_model()
Session = sessionmaker(bind=engine)
session = Session()
try:
Base.metadata.drop_all(engine, tables=[SaneTestModel.__table__])
except sqlalchemy.exc.NoSuchTableError:
pass
Base.metadata.create_all(engine, tables=[SaneTestModel.__table__])
try:
assert is_sane_database(Base, session) is True
finally:
Base.metadata.drop_all(engine)
def test_sanity_table_missing(ini_settings, dbsession):
"""See check fails when there is a missing table"""
engine = engine_from_config(ini_settings, 'sqlalchemy.')
conn = engine.connect()
trans = conn.begin()
Base, SaneTestModel = gen_test_model()
Session = sessionmaker(bind=engine)
session = Session()
try:
Base.metadata.drop_all(engine, tables=[SaneTestModel.__table__])
except sqlalchemy.exc.NoSuchTableError:
pass
assert is_sane_database(Base, session) is False
def test_sanity_column_missing(ini_settings, dbsession):
"""See check fails when there is a missing table"""
engine = engine_from_config(ini_settings, 'sqlalchemy.')
conn = engine.connect()
trans = conn.begin()
Session = sessionmaker(bind=engine)
session = Session()
Base, SaneTestModel = gen_test_model()
try:
Base.metadata.drop_all(engine, tables=[SaneTestModel.__table__])
except sqlalchemy.exc.NoSuchTableError:
pass
Base.metadata.create_all(engine, tables=[SaneTestModel.__table__])
# Delete one of the columns
engine.execute("ALTER TABLE sanity_check_test DROP COLUMN id")
assert is_sane_database(Base, session) is False
def test_sanity_pass_relationship(ini_settings, dbsession):
"""See database sanity check understands about relationships and don't deem them as missing column."""
engine = engine_from_config(ini_settings, 'sqlalchemy.')
conn = engine.connect()
trans = conn.begin()
Session = sessionmaker(bind=engine)
session = Session()
Base, RelationTestModel, RelationTestModel2 = gen_relation_models()
try:
Base.metadata.drop_all(engine, tables=[RelationTestModel.__table__, RelationTestModel2.__table__])
except sqlalchemy.exc.NoSuchTableError:
pass
Base.metadata.create_all(engine, tables=[RelationTestModel.__table__, RelationTestModel2.__table__])
try:
assert is_sane_database(Base, session) is True
finally:
Base.metadata.drop_all(engine)
def test_sanity_pass_declarative(ini_settings, dbsession):
"""See database sanity check understands about relationships and don't deem them as missing column."""
engine = engine_from_config(ini_settings, 'sqlalchemy.')
conn = engine.connect()
trans = conn.begin()
Session = sessionmaker(bind=engine)
session = Session()
Base, DeclarativeTestModel = gen_declarative()
try:
Base.metadata.drop_all(engine, tables=[DeclarativeTestModel.__table__])
except sqlalchemy.exc.NoSuchTableError:
pass
Base.metadata.create_all(engine, tables=[DeclarativeTestModel.__table__])
try:
assert is_sane_database(Base, session) is True
finally:
Base.metadata.drop_all(engine)
@phuongcv1112

This comment has been minimized.

Copy link

commented May 7, 2019

Wonderful Script :D Thank you so much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.