Skip to content

Instantly share code, notes, and snippets.

@cdent
Created November 3, 2018 16:44
Show Gist options
  • Save cdent/a668cca4cd11aa14bd9eec21bef140fe to your computer and use it in GitHub Desktop.
Save cdent/a668cca4cd11aa14bd9eec21bef140fe to your computer and use it in GitHub Desktop.
import contextlib
import functools
import testtools
from alembic import script
import fixtures
import mock
from oslo_concurrency.fixture import lockutils as concurrency
from oslo_config import fixture as config_fixture
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import provision
from oslo_db.sqlalchemy import test_fixtures
from oslo_db.sqlalchemy import test_migrations
from oslo_log import log as logging
from oslotest import base as test_base
from placement import conf
from placement.db.sqlalchemy import migration
from placement.db.sqlalchemy import models
from placement import db_api
"""
All the magic in the OpportunistDBMixin was making it very hard to figure
out what was going on with test failures. I had figured out that at least
some of the time a different database name was being used, and this may
have been confusing the use of the [placement_database]/connection. So
I started trying to scale things back to something that actually works.
This now passes in a way that I expect it to but it has some caveats:
* It uses its own checking to see if the required database is around.
If not, it calls skipTest.
* Because it always uses the same database name, there are concurrency
problems which have to be guarded with a lock.
* That lock currently live in /tmp but presumably that's just not good
enough.
I suspect the magic available from the oslo_db fixtures is very handy
if you grok how it works and if you are doing something normal with
your db, but that's not what is going on here. The placement context
manager is a bit different in how it configures itself, thus not normal.
And I'm _really_ struggling to tease out the relationships between the
vast numbers of classes involved in oslo_db..test_{fixtures,migrations}.
"""
CONF = conf.CONF
DB_NAME = 'openstack_citest'
LOG = logging.getLogger(__name__)
@contextlib.contextmanager
def patch_with_engine(engine):
with mock.patch.object(enginefacade.writer,
'get_engine') as patch_engine:
patch_engine.return_value = engine
yield
def generate_url(driver):
"""Make a database URL to be used with the opportunistic tests."""
# NOTE(cdent): Because of the way we need to configure the
# [placement_database]/connection, we need to have a predictable
# database URL.
backend = provision.BackendImpl.impl(driver)
url = backend.create_opportunistic_driver_url()
# if a dbname is present or the url ends with '/' take it off
url = url[:url.rindex('/')]
url = url + '/' + DB_NAME
return url
class WalkVersionsMixin(object):
def _walk_versions(self, engine=None, alembic_cfg=None):
# Determine latest version script from the repo, then
# upgrade from 1 through to the latest, with no data
# in the databases. This just checks that the schema itself
# upgrades successfully.
# Place the database under version control
with patch_with_engine(engine):
script_directory = script.ScriptDirectory.from_config(alembic_cfg)
self.assertIsNone(self.migration_api.version(alembic_cfg))
versions = [ver for ver in script_directory.walk_revisions()]
for version in reversed(versions):
self._migrate_up(engine, alembic_cfg,
version.revision, with_data=True)
def _migrate_up(self, engine, config, version, with_data=False):
"""migrate up to a new version of the db.
We allow for data insertion and post checks at every
migration version with special _pre_upgrade_### and
_check_### functions in the main test.
"""
# NOTE(sdague): try block is here because it's impossible to debug
# where a failed data migration happens otherwise
try:
if with_data:
data = None
pre_upgrade = getattr(
self, "_pre_upgrade_%s" % version, None)
if pre_upgrade:
data = pre_upgrade(engine)
self.migration_api.upgrade(version, config=config)
self.assertEqual(version, self.migration_api.version(config))
if with_data:
check = getattr(self, "_check_%s" % version, None)
if check:
check(engine, data)
except Exception:
LOG.error("Failed to migrate to version %(version)s on engine "
"%(engine)s",
{'version': version, 'engine': engine})
raise
class MigrationCheckersMixin(object):
def setUp(self):
conf_fixture = self.useFixture(config_fixture.Config(CONF))
# find the right path for this
conf_fixture.config(lock_path='/tmp', group='oslo_concurrency')
self.useFixture(concurrency.LockFixture('test_mig'))
url = generate_url(self.DRIVER)
conf_fixture.config(group='placement_database', connection=url)
db_api.configure(CONF)
try:
self.engine = db_api.get_placement_engine()
except db_exc.DBNonExistentDatabase:
self.skipTest('%s not available' % self.DRIVER)
self.config = migration._alembic_config()
self.migration_api = migration
super(MigrationCheckersMixin, self).setUp()
backend = provision.Backend(self.engine.name, self.engine.url)
self.addCleanup(
functools.partial(backend.drop_all_objects, self.engine))
self.addCleanup(self.engine.dispose)
def test_walk_versions(self):
self._walk_versions(self.engine, self.config)
# # Leaving this here as a sort of template for when we do migration tests.
# def _check_fb3f10dd262e(self, engine, data):
# nodes_tbl = db_utils.get_table(engine, 'nodes')
# col_names = [column.name for column in nodes_tbl.c]
# self.assertIn('fault', col_names)
# self.assertIsInstance(nodes_tbl.c.fault.type,
# sqlalchemy.types.String)
def test_upgrade_and_version(self):
self.migration_api.upgrade('head')
self.assertIsNotNone(self.migration_api.version())
def test_upgrade_twice(self):
# Start with the empty version
self.migration_api.upgrade('158782c7f38c')
v1 = self.migration_api.version()
# Now upgrade to head
self.migration_api.upgrade('head')
v2 = self.migration_api.version()
self.assertNotEqual(v1, v2)
class TestMigrationsMySQL(MigrationCheckersMixin,
WalkVersionsMixin,
test_base.BaseTestCase):
DRIVER = 'mysql'
class TestMigrationsPostgresql(MigrationCheckersMixin,
WalkVersionsMixin,
test_base.BaseTestCase):
DRIVER = 'postgresql'
class ModelsMigrationSyncMixin(object):
def setUp(self):
conf_fixture = self.useFixture(config_fixture.Config(CONF))
conf_fixture.config(lock_path='/tmp', group='oslo_concurrency')
self.useFixture(concurrency.LockFixture('test_mig'))
url = generate_url(self.DRIVER)
import sys
sys.stderr.write('#### url is %s\n' % url)
conf_fixture.config(group='placement_database', connection=url)
db_api.configure(CONF)
super(ModelsMigrationSyncMixin, self).setUp()
def get_metadata(self):
return models.BASE.metadata
def get_engine(self):
try:
return db_api.get_placement_engine()
except db_exc.DBNonExistentDatabase:
self.skipTest('%s not available' % self.DRIVER)
def db_sync(self, engine):
migration.upgrade('head')
class ModelsMigrationsSyncMysql(ModelsMigrationSyncMixin,
test_migrations.ModelsMigrationsSync,
test_base.BaseTestCase):
DRIVER = 'mysql'
class ModelsMigrationsSyncPostgresql(ModelsMigrationSyncMixin,
test_migrations.ModelsMigrationsSync,
test_base.BaseTestCase):
DRIVER = 'postgresql'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment