-
-
Save naspeh/021bdc9b480c7efc9761 to your computer and use it in GitHub Desktop.
Test runner for django and postgres
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
datadiff # provide human-readable diffs of python data structures | |
django-nose==0.1.3 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
TEST_RUNNER = 'testing.runner.TestSuiteRunner' | |
NOSE_PLUGINS = [ | |
'testing.runner.CommonWrapper', | |
'testing.runner.ReuseDbWrapper', | |
'testing.runner.MagicDbWrapper', | |
] | |
NOSE_ARGS = [ | |
'--with-common-wrap', | |
'--with-magic-db', | |
'--with-reuse-db' | |
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
You need to rewrite: | |
- `TestSuiteRunner.setup_databases` method | |
''' | |
import os | |
import traceback | |
import uuid | |
import warnings | |
from datadiff import diff | |
from django import db | |
from django.conf import settings | |
from django.core.management import call_command | |
from django.core.cache import get_cache | |
from django.test import TransactionTestCase | |
from django_nose.runner import NoseTestSuiteRunner | |
from nose.plugins import Plugin | |
from mock import patch | |
from redis import Redis | |
def psql(sql): | |
db.close_connection() | |
cursor = db.connection.cursor() | |
db.connection.creation._prepare_for_test_db_ddl() | |
sql = sql.rstrip(';').split(';') | |
for item in sql: | |
cursor.execute(item) | |
print(item) | |
db.close_connection() | |
class CommonWrapper(Plugin): | |
'''Wrapper for setup, teardown common actions''' | |
name = 'common-wrap' | |
def options(self, parser, env): | |
'''Register commandline options''' | |
Plugin.options(self, parser, env) | |
parser.add_option('--without-mock-http', | |
action='store_true', dest='without_mock_http', | |
help='Don\'t mock http requests' | |
) | |
parser.add_option('--enable-logging', | |
action='store_true', dest='enable_logging', | |
help='Enable all logging, not only WARN level' | |
) | |
def get_test_param(self, test, name): | |
test = test.test | |
func_name = test._testMethodName | |
func = getattr(test, func_name) | |
return getattr(func, name, None) | |
def configure(self, options, conf): | |
'''Configure plugin''' | |
Plugin.configure(self, options, conf) | |
self.without_mock_http = options.without_mock_http | |
self.enable_logging = options.enable_logging | |
def startTest(self, test): | |
self.test = test | |
test_param = self.get_test_param(test, 'without_mock_http') | |
if not (self.without_mock_http or test_param): | |
self.mock_http() | |
if not self.enable_logging: | |
import logging | |
logging.disable(logging.WARN) | |
def stopTest(self, test): | |
# clear redis databases | |
for conf in settings.REDIS.values(): | |
Redis(**conf).flushdb() | |
# clear caches | |
for key in settings.CACHES.keys(): | |
get_cache(key).clear() | |
def mock_func(self, name, orig_obj=None): | |
def mock_func(*args, **kwargs): | |
if orig_obj: | |
print('Mock %s(*%r, **%r) %s' % (name, args, kwargs, orig_obj)) | |
return orig_obj | |
assert False, '%r %r\n%s' % ( | |
args, kwargs, '\n'.join(traceback.format_stack()) | |
) | |
mock_obj = patch(name, mock_func) | |
mock_obj.start() | |
self.test.test.addCleanup(mock_obj.stop) | |
def mock_http(self): | |
self.mock_func('urllib2.urlopen') | |
self.mock_func('urllib2.build_opener') | |
self.mock_func('requests.api.request') # remove this, if unused requests | |
class ReuseDbWrapper(Plugin): | |
'''Reuse db from last run''' | |
name = 'reuse-db' | |
def options(self, parser, env): | |
'''Register commandline options''' | |
Plugin.options(self, parser, env) | |
parser.add_option('--create-db', | |
action='store_true', dest='with_db_create', | |
help='Create db before tests' | |
) | |
def configure(self, options, conf): | |
'''Configure plugin''' | |
Plugin.configure(self, options, conf) | |
if not self.enabled: | |
with_create = True | |
else: | |
with_create = options.with_db_create | |
if with_create is None: | |
with_create = getattr(settings, 'TEST_DB_CREATE', False) | |
with_create = os.environ.get('TEST_DB_CREATE', with_create) | |
settings.TEST_DB_CREATE = with_create | |
class MagicDbWrapper(Plugin): | |
'''Wrap each test with clear db, has two modes: | |
1. transaction mode (default mode); | |
2. unique db mode, enable: | |
class TestMain(TestCase): | |
_test_unique_db = True | |
''' | |
name = 'magic-db' | |
@property | |
def db_conf(self): | |
return TestSuiteRunner.db_conf | |
def prepareTestCase(self, test): | |
test = test.test | |
# don't use django wraps of database | |
if isinstance(test, TransactionTestCase): | |
test._fixture_setup = lambda: None | |
test._fixture_teardown = lambda: None | |
self.db_src = db_name = self.db_conf['NAME'] | |
assert db_name.startswith('test_'), db_name | |
if hasattr(test, '_test_db'): | |
db_name = '%s_%s' % (self.db_src, test._test_db) | |
self.db_conf['NAME'] = self.db_conf['TEST_NAME'] = db_name | |
if hasattr(test, '_test_unique_db'): | |
self.start_unique_db() | |
else: | |
self.start_transaction() | |
def afterTest(self, test): | |
if hasattr(test.test, '_test_unique_db'): | |
self.stop_unique_db() | |
else: | |
self.stop_transaction() | |
self.db_conf['NAME'] = self.db_conf['TEST_NAME'] = self.db_src | |
db.close_connection() | |
def start_unique_db(self): | |
src = self.db_conf['TEST_NAME'] | |
self.db_new = new = '%s_%s' % (src, uuid.uuid4().hex) | |
psql( | |
'DROP DATABASE IF EXISTS "{0}";' | |
'CREATE DATABASE "{0}" WITH TEMPLATE "{1}";' | |
.format(new, src) | |
) | |
self.db_conf['NAME'] = self.db_conf['TEST_NAME'] = new | |
def stop_unique_db(self): | |
self.db_conf['NAME'] = self.db_conf['TEST_NAME'] = self.db_src | |
psql('DROP DATABASE "{0}";'.format(self.db_new)) | |
def start_transaction(self): | |
self.db_expects = self.get_tables_count() | |
db.transaction.enter_transaction_management() | |
db.transaction.managed(True) | |
def stop_transaction(self): | |
db.transaction.rollback() | |
db.transaction.leave_transaction_management() | |
db_last = self.get_tables_count() | |
if self.db_expects != db_last: | |
db_diff = diff(self.db_expects, db_last) | |
warnings.warn( | |
'Has difference of database after test:\n %s' % db_diff, | |
RuntimeWarning | |
) | |
def get_tables_count(self): | |
cursor = db.connection.cursor() | |
cursor.execute( | |
'SELECT relname,n_live_tup ' | |
'FROM pg_stat_user_tables ' | |
'ORDER BY relname' | |
) | |
return cursor.fetchall() | |
class TestSuiteRunner(NoseTestSuiteRunner): | |
db_conf = settings.DATABASES['default'] | |
db_name = db_conf['NAME'] | |
def __init__(self, *args, **kwargs): | |
if not getattr(settings, 'TESTING', False): | |
raise SystemExit( | |
'Wrong settings, try again with --settings=settings.testing' | |
) | |
super(TestSuiteRunner, self).__init__(*args, **kwargs) | |
def reset_database(self, name, template='template0'): | |
psql( | |
'DROP DATABASE IF EXISTS "{0}";' | |
'CREATE DATABASE "{0}" WITH ENCODING=\'UTF8\' TEMPLATE {1};' | |
.format(name, template) | |
) | |
def set_test_databeses(self): | |
test_db_name = db.connection.creation._get_test_db_name() | |
self.db_conf['TEST_NAME'] = test_db_name | |
self.db_conf['NAME'] = test_db_name | |
return test_db_name | |
def setup_databases(self, *args, **kw): | |
test_db_name = self.set_test_databeses() | |
if not settings.TEST_DB_CREATE: | |
return | |
self.db_conf['NAME'] = self.db_name | |
self.reset_database(test_db_name) | |
self.db_conf['NAME'] = test_db_name | |
call_command('syncdb', | |
verbosity=max(self.verbosity - 1, 0), | |
interactive=False, | |
load_initial_data=False | |
) | |
call_command('flush', | |
verbosity=max(self.verbosity - 1, 0), | |
interactive=False | |
) | |
psql( | |
'create extension cube;' | |
'create extension earthdistance;' | |
) | |
# save clean template database | |
self.reset_database('%s_clean' % test_db_name, test_db_name) | |
# fill db with real data of gis | |
call_command('gis_import') | |
def teardown_databases(self, *args, **kw): | |
self.db_conf['NAME'] = self.db_name |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment