|
import collections |
|
import itertools |
|
import unittest |
|
|
|
from django.db import ( |
|
DEFAULT_DB_ALIAS, |
|
connections, |
|
transaction, |
|
) |
|
from django.test import testcases |
|
from django.test.simple import DjangoTestSuiteRunner |
|
from django.test.testcases import ( |
|
connections_support_transactions, |
|
nop, |
|
) |
|
|
|
import dhain_trie |
|
|
|
DATABASES = [DEFAULT_DB_ALIAS] |
|
|
|
def iter_isfirst(iterable): |
|
""" |
|
Based on [1] to indicate first-run too. |
|
|
|
[1] http://code.activestate.com/recipes/392015-finding-the-last-item-in-a-loop/ |
|
""" |
|
it = iter(iterable) |
|
yield it.next(), True |
|
for x in it: |
|
yield x, False |
|
|
|
def stripPrefix(seq, prefix): |
|
if not seq: |
|
return None |
|
if not prefix: |
|
return None |
|
it = iter(prefix) |
|
trimmed = list(itertools.takewhile(lambda x: x == it.next(), seq)) |
|
del seq[0:len(trimmed)] |
|
return trimmed |
|
|
|
def bump(obj, attr, newval): |
|
oldval = getattr(obj, attr, None) |
|
setattr(obj, attr, newval) |
|
return oldval |
|
|
|
class Bunch(object): |
|
""" |
|
Based on the Python Cookbook 4.18 |
|
""" |
|
def __init__(self, **kwargs): |
|
self.update(**kwargs) |
|
|
|
def update(self, **kwargs): |
|
self.__dict__.update(kwargs) |
|
|
|
|
|
class Queue(collections.deque): |
|
def pop_iter(self): |
|
try: |
|
x = self.pop() |
|
yield x |
|
except IndexError: |
|
raise StopIteration |
|
|
|
|
|
class SuiteLikeTest(unittest.TestSuite): |
|
@property |
|
def firstTestClass(self): |
|
for test in itertools.islice(self, 0, 1): |
|
return test |
|
|
|
@property |
|
def fixtures(self): |
|
kls = self.firstTestClass |
|
if kls: |
|
return getattr(kls, 'fixtures', None) |
|
|
|
|
|
class PrefixSuite(SuiteLikeTest): |
|
def __init__(self, *args, **kwargs): |
|
self._prefixes = [] |
|
self._parentPrefixes = [] |
|
super(PrefixSuite, self).__init__(*args, **kwargs) |
|
|
|
def addSuite(self, suite, prefixes): |
|
suite._parentPrefixes = self._parentPrefixes + self._prefixes |
|
suite._prefixes = prefixes |
|
super(PrefixSuite, self).addTest(suite) |
|
return suite, [] |
|
|
|
def __call__(self, *args, **kwargs): |
|
fixtures = self.fixtures |
|
trimmed = stripPrefix(fixtures, self._parentPrefixes) |
|
|
|
savepoints = [(db, transaction.savepoint(using=db)) for db in DATABASES] |
|
|
|
ret = super(PrefixSuite, self).__call__(*args, **kwargs) |
|
|
|
for db, savepoint in savepoints: |
|
transaction.savepoint_rollback(savepoint, using=db) |
|
|
|
if fixtures and trimmed: |
|
fixtures[0:] = trimmed + fixtures |
|
|
|
return ret |
|
|
|
|
|
class RootSuite(PrefixSuite): |
|
def __call__(self, *args, **kwargs): |
|
for db in DATABASES: |
|
transaction.enter_transaction_management(using=db) |
|
transaction.managed(True, using=db) |
|
|
|
restore_transaction_methods = bump(testcases, 'restore_transaction_methods', nop) |
|
disable_transaction_methods = bump(testcases, 'disable_transaction_methods', nop) |
|
|
|
real_closes = [(c, bump(c, 'close', nop)) for c in connections.all()] |
|
|
|
disable_transaction_methods() |
|
|
|
# this is *not* a typo - skip PrefixSuite's overriden __call__ |
|
ret = super(PrefixSuite, self).__call__(*args, **kwargs) |
|
|
|
bump(testcases, 'restore_transaction_methods', restore_transaction_methods) |
|
bump(testcases, 'disable_transaction_methods', disable_transaction_methods) |
|
|
|
restore_transaction_methods() |
|
|
|
for db in DATABASES: |
|
transaction.rollback(using=db) |
|
transaction.leave_transaction_management(using=db) |
|
|
|
for conn, real_close in real_closes: |
|
bump(conn, 'close', real_close) |
|
conn.close() |
|
|
|
return ret |
|
|
|
|
|
class CasesSuite(PrefixSuite): |
|
def __dec_iter(self): |
|
state = Bunch(savepoint_rollbacks=Queue()) |
|
|
|
for test, first in iter_isfirst(super(CasesSuite, self).__iter__()): |
|
state.update(is_first=first) |
|
setattr(test, '_state', state) |
|
yield test |
|
|
|
def __iter__(self): |
|
if not getattr(self, '_needs_state', False): |
|
return super(CasesSuite, self).__iter__() |
|
return self.__dec_iter() |
|
|
|
def run(self, *args, **kwargs): |
|
self._needs_state = True |
|
|
|
ret = super(CasesSuite, self).run(*args, **kwargs) |
|
|
|
self._needs_state = False |
|
|
|
return ret |
|
|
|
|
|
class Tree(dhain_trie.Trie): |
|
def addSuite(self, suite): |
|
f = suite.fixtures |
|
if f is None: |
|
return False |
|
try: |
|
old_suite = self[f] |
|
except (KeyError, dhain_trie.NeedMore): |
|
self[f] = suite |
|
else: |
|
# we could have just stripped all the fixtures; by setting |
|
# _parentPrefixes, we get the modify-restore functionality |
|
# of PrefixSuite. |
|
suite._parentPrefixes = old_suite.fixtures[:] |
|
old_suite.addTest(suite) |
|
return True |
|
|
|
def asSuite(self): |
|
root_suite = RootSuite() |
|
nodes = [(root_suite, [], self.root)] |
|
while nodes: |
|
suite, prefixes, node = nodes.pop() |
|
if node.key: |
|
prefixes.append(node.key) |
|
if not node.value is node.no_value: |
|
suite, prefixes = suite.addSuite(node.value, prefixes) |
|
|
|
nodes.extend((suite, prefixes[:], node) \ |
|
for node in node.nodes.values()) |
|
|
|
return root_suite |
|
|
|
|
|
class TestSuiteRunner(DjangoTestSuiteRunner): |
|
def run_suite(self, suite, **kwargs): |
|
if not connections_support_transactions(): |
|
return super(TestSuiteRunner, self).run_suite(suite, **kwargs) |
|
|
|
# collate by test's class |
|
kls_suites = {} |
|
for test in suite: |
|
kls_suites.setdefault(test.__class__, CasesSuite()).addTest(test) |
|
|
|
tree = Tree() |
|
excluded = [suite for suite in kls_suites.values() \ |
|
if not tree.addSuite(suite)] |
|
suite = tree.asSuite() |
|
suite.addTests(excluded) |
|
|
|
return super(TestSuiteRunner, self).run_suite(suite, **kwargs) |