Skip to content

Instantly share code, notes, and snippets.

@miohtama
Created June 4, 2015 21:22
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miohtama/278fd4eeb9e5272d061c to your computer and use it in GitHub Desktop.
Save miohtama/278fd4eeb9e5272d061c to your computer and use it in GitHub Desktop.
Perform database sync sanity check to SQLAlchemy models on application startup
import logging
from sqlalchemy import inspect
from sqlalchemy.ext.declarative.clsregistry import _ModuleMarker
from sqlalchemy.orm import RelationshipProperty
logger = logging.getLogger(__name__)
def is_sane_database(Base, session):
"""Check whether the current database matches the models declared in model base.
Currently we check that all tables exist with all columns. What is not checked
* Column types are not verified
* Relationships are not verified at all (TODO)
:param Base: Declarative Base for SQLAlchemy models to check
:param session: SQLAlchemy session bound to an engine
:return: True if all declared models have corresponding tables and columns.
"""
engine = session.get_bind()
iengine = inspect(engine)
errors = False
tables = iengine.get_table_names()
# Go through all SQLAlchemy models
for name, klass in Base._decl_class_registry.items():
if isinstance(klass, _ModuleMarker):
# Not a model
continue
table = klass.__tablename__
if table in tables:
# Check all columns are found
# Looks like [{'default': "nextval('sanity_check_test_id_seq'::regclass)", 'autoincrement': True, 'nullable': False, 'type': INTEGER(), 'name': 'id'}]
columns = [c["name"] for c in iengine.get_columns(table)]
mapper = inspect(klass)
for column_prop in mapper.attrs:
if isinstance(column_prop, RelationshipProperty):
# TODO: Add sanity checks for relations
pass
else:
for column in column_prop.columns:
# Assume normal flat column
if not column.key in columns:
logger.error("Model %s declares column %s which does not exist in database %s", klass, column.key, engine)
errors = True
else:
logger.error("Model %s declares table %s which does not exist in database %s", klass, table, engine)
errors = True
return not errors
"""Tests for checking database sanity checks functions correctly."""
from pyramid_web20.system.model.sanitycheck import is_sane_database
from sqlalchemy import engine_from_config, Column, Integer, String
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import ForeignKey
def setup_module(self):
# Quiet log output for the tests
import logging
from pyramid_web20.system.model.sanitycheck import logger
#logger.setLevel(logging.FATAL)
def gen_test_model():
Base = declarative_base()
class SaneTestModel(Base):
"""A sample SQLAlchemy model to demostrate db conflicts. """
__tablename__ = "sanity_check_test"
#: Running counter used in foreign key references
id = Column(Integer, primary_key=True)
return Base, SaneTestModel
def gen_relation_models():
Base = declarative_base()
class RelationTestModel(Base):
__tablename__ = "sanity_check_test_2"
id = Column(Integer, primary_key=True)
class RelationTestModel2(Base):
__tablename__ = "sanity_check_test_3"
id = Column(Integer, primary_key=True)
test_relationship_id = Column(ForeignKey("sanity_check_test_2.id"))
test_relationship = relationship(RelationTestModel, primaryjoin=test_relationship_id == RelationTestModel.id)
return Base, RelationTestModel, RelationTestModel2
def gen_declarative():
Base = declarative_base()
class DeclarativeTestModel(Base):
__tablename__ = "sanity_check_test_4"
id = Column(Integer, primary_key=True)
@declared_attr
def _password(self):
return Column('password', String(256), nullable=False)
@hybrid_property
def password(self):
return self._password
return Base, DeclarativeTestModel
def test_sanity_pass(ini_settings, dbsession):
"""See database sanity check completes when tables and columns are created."""
engine = engine_from_config(ini_settings, 'sqlalchemy.')
conn = engine.connect()
trans = conn.begin()
Base, SaneTestModel = gen_test_model()
Session = sessionmaker(bind=engine)
session = Session()
try:
Base.metadata.drop_all(engine, tables=[SaneTestModel.__table__])
except sqlalchemy.exc.NoSuchTableError:
pass
Base.metadata.create_all(engine, tables=[SaneTestModel.__table__])
try:
assert is_sane_database(Base, session) is True
finally:
Base.metadata.drop_all(engine)
def test_sanity_table_missing(ini_settings, dbsession):
"""See check fails when there is a missing table"""
engine = engine_from_config(ini_settings, 'sqlalchemy.')
conn = engine.connect()
trans = conn.begin()
Base, SaneTestModel = gen_test_model()
Session = sessionmaker(bind=engine)
session = Session()
try:
Base.metadata.drop_all(engine, tables=[SaneTestModel.__table__])
except sqlalchemy.exc.NoSuchTableError:
pass
assert is_sane_database(Base, session) is False
def test_sanity_column_missing(ini_settings, dbsession):
"""See check fails when there is a missing table"""
engine = engine_from_config(ini_settings, 'sqlalchemy.')
conn = engine.connect()
trans = conn.begin()
Session = sessionmaker(bind=engine)
session = Session()
Base, SaneTestModel = gen_test_model()
try:
Base.metadata.drop_all(engine, tables=[SaneTestModel.__table__])
except sqlalchemy.exc.NoSuchTableError:
pass
Base.metadata.create_all(engine, tables=[SaneTestModel.__table__])
# Delete one of the columns
engine.execute("ALTER TABLE sanity_check_test DROP COLUMN id")
assert is_sane_database(Base, session) is False
def test_sanity_pass_relationship(ini_settings, dbsession):
"""See database sanity check understands about relationships and don't deem them as missing column."""
engine = engine_from_config(ini_settings, 'sqlalchemy.')
conn = engine.connect()
trans = conn.begin()
Session = sessionmaker(bind=engine)
session = Session()
Base, RelationTestModel, RelationTestModel2 = gen_relation_models()
try:
Base.metadata.drop_all(engine, tables=[RelationTestModel.__table__, RelationTestModel2.__table__])
except sqlalchemy.exc.NoSuchTableError:
pass
Base.metadata.create_all(engine, tables=[RelationTestModel.__table__, RelationTestModel2.__table__])
try:
assert is_sane_database(Base, session) is True
finally:
Base.metadata.drop_all(engine)
def test_sanity_pass_declarative(ini_settings, dbsession):
"""See database sanity check understands about relationships and don't deem them as missing column."""
engine = engine_from_config(ini_settings, 'sqlalchemy.')
conn = engine.connect()
trans = conn.begin()
Session = sessionmaker(bind=engine)
session = Session()
Base, DeclarativeTestModel = gen_declarative()
try:
Base.metadata.drop_all(engine, tables=[DeclarativeTestModel.__table__])
except sqlalchemy.exc.NoSuchTableError:
pass
Base.metadata.create_all(engine, tables=[DeclarativeTestModel.__table__])
try:
assert is_sane_database(Base, session) is True
finally:
Base.metadata.drop_all(engine)
@phuongcv1112
Copy link

Wonderful Script :D Thank you so much!

@aayla-secura
Copy link

Thank you. Do you mind if I include it in one of my PyPi packages (obviously crediting you and including a link to this gist)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment