Skip to content

Instantly share code, notes, and snippets.

@klrmn
Created May 21, 2014 21:40
Show Gist options
  • Save klrmn/06d1cf81b8f3bcb8a77c to your computer and use it in GitHub Desktop.
Save klrmn/06d1cf81b8f3bcb8a77c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import unittest
from testresources import OptimisingTestSuite
from subunit.run import SubunitTestProgram, SubunitTestRunner, get_default_formatter
import io, sys, os, logging
import fixtures
from testtools import content
from testtools.testcase import gather_details
class TestLoader(unittest.TestLoader):
"""Test loader that extends unittest.TestLoader to:
* support names that can be a combination of modules and directories
"""
def loadTestsFromNames(self, names, module=None):
"""Return a suite of all tests cases found using the given sequence
of string specifiers. See 'loadTestsFromName()'.
"""
suites = []
for name in names:
if os.path.isdir(name):
top_level = os.path.split(name)[0]
suites.extend(self.discover(name, top_level_dir=top_level))
else:
suites.extend(self.loadTestsFromName(name, module))
return self.suiteClass(suites)
def switch(self, previous_test, test, result):
"""Keeps track of the current resource set. Attaches errors removing
resources to the previous test, and errors adding resources to the
current test.
:param previous_test: Most recent completed test, or None
:param test: Current test, or None.
:param result: TestResult object to report activity on.
"""
if test:
resources = getattr(test, 'resources', [])
new_resource_set = set()
for name, resource in resources:
new_resource_set.update(resource.neededResources())
else:
new_resource_set = set()
# determine which uses cannot be reused or don't exist now
new_resources = new_resource_set - self.current_resources
old_resources = self.current_resources - new_resource_set
FORMAT = '%(asctime)s [%(levelname)s] %(name)s %(lineno)d: %(message)s'
fixture = fixtures.FakeLogger(format=FORMAT)
fixture.setUp()
abort = False
# attribute any failures in resource teardown to the previous test
# print("Removing %s" % old_resources)
for resource in old_resources:
try:
resource.finishedWith(resource._currentResource, result)
self.current_resources.discard(resource)
except Exception as e:
fixture.addDetail('traceback', content.TracebackContent(sys.exc_info(), previous_test))
from acceptance.resources import DirtyWebDriverResource, WebDriverRM
if isinstance(e, DirtyWebDriverResource):
WebDriverRM.dirtied(e.resource) # try to get a new instance
else:
abort = True
break
# attribute any failures in resource setup to the current test
# print("Adding %s" % new_resources)
if not abort:
for resource in new_resources:
try:
resource.getResource(result)
self.current_resources.add(resource)
except Exception as e:
fixture.addDetail('traceback', content.TracebackContent(sys.exc_info(), previous_test))
from acceptance.resources import DirtyWebDriverResource, WebDriverRM
if isinstance(e, DirtyWebDriverResource):
WebDriverRM.dirtied(e.resource) # try to get a new instance
else:
abort = True
break
# print(self.current_resources)
if abort:
if test:
gather_details(fixture.getDetails(), test.getDetails())
result.addError(test, details=test.getDetails())
else:
gather_details(fixture.getDetails(), previous_test.getDetails())
result.addError(previous_test, details=previous_test.getDetails())
fixture.cleanUp()
return not abort
def run(self, result):
self.sortTests()
# the next several lines' job is to tell me how
# tests are being divided for parallelization
node = os.getenv('DEFAULT_NODE', 'node1')
import time
time.sleep(10 * int(node[-1]))
for test in self._tests:
if not hasattr(test, 'id'):
print('%s suite' % node)
else:
print('%s %s' % (node, test.id()))
return result
previous_test = None
# print(self._tests)
for test in self._tests:
if not hasattr(test, 'id'):
continue
# print('%s' % test)
if result.shouldStop:
break
if self.switch(previous_test, test, result):
test(result)
previous_test = test
self.switch(previous_test, None, result)
return result
OptimisingTestSuite.switch = switch
OptimisingTestSuite.run = run
OptimisingTestSuite.current_resources = set()
def main():
# Disable the default buffering, for Python 2.x where pdb doesn't do it
# on non-ttys.
stream = get_default_formatter()
runner = SubunitTestRunner
# Patch stdout to be unbuffered, so that pdb works well on 2.6/2.7.
binstdout = io.open(sys.stdout.fileno(), 'wb', 0)
if sys.version_info[0] > 2:
sys.stdout = io.TextIOWrapper(binstdout, encoding=sys.stdout.encoding)
else:
sys.stdout = binstdout
loader = TestLoader()
loader.suiteClass = OptimisingTestSuite
SubunitTestProgram(module=None, argv=sys.argv, testRunner=runner,
stdout=sys.stdout, testLoader=loader)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment