Created
June 2, 2022 14:46
-
-
Save sprout42/a00e6bb4ebb7689ecc4dc8d612e2426e to your computer and use it in GitHub Desktop.
Python unittest runner and debug tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import gc | |
import os | |
import sys | |
import glob | |
import time | |
import os.path | |
import unittest | |
import unittest.case | |
import argparse | |
import warnings | |
import importlib | |
try: | |
from memory_profiler import profile | |
except ImportError: | |
# Make a fake profile decorator | |
def profile(func): | |
return func | |
def run_module_tests(module, tests, verbosity=1, failfast=False, buffer=False): | |
args = { | |
'verbosity': verbosity, | |
'failfast': failfast, | |
'buffer': buffer, | |
} | |
errors = [] | |
failures = [] | |
skipped = [] | |
total = 0 | |
# Module setup | |
if hasattr(module, 'setUpModule'): | |
module.setUpModule() | |
for testcls in tests: | |
# Class setup | |
if hasattr(testcls, 'setUpClass'): | |
testcls.setUpClass() | |
for testname in tests[testcls]: | |
runner = TestRunner(**args) | |
test = testcls(testname) | |
results = runner.run(test) | |
# collect results | |
errors.extend(('%s.%s.%s' % (t.__class__.__module__, t.__class__.__name__, t._testMethodName), e) for t, e in results.errors) | |
failures.extend(('%s.%s.%s' % (t.__class__.__module__, t.__class__.__name__, t._testMethodName), e) for t, e in results.failures) | |
skipped.extend(('%s.%s.%s' % (t.__class__.__module__, t.__class__.__name__, t._testMethodName), r) for t, r in results.skipped) | |
total += results.testsRun | |
del results, runner, test | |
gc.collect() | |
# Test class cleanup, the class teardown function should only be called | |
# if there were no failures | |
if not errors and not failures and hasattr(testcls, 'tearDownClass'): | |
testcls.tearDownClass() | |
# Always call the final doClassCleanups function (if defined) | |
if hasattr(testcls, 'doClassCleanups'): | |
testcls.doClassCleanups() | |
# Module cleanup | |
if hasattr(module, 'tearDownModule'): | |
module.tearDownModule() | |
unittest.case.doModuleCleanups() | |
return errors, failures, skipped, total | |
class TestRunner(unittest.TextTestRunner): | |
def run(self, test): | |
""" | |
Custom Test Runner that doesn't immediately print results, so it also | |
doesn't track execution time in this class. | |
""" | |
result = self._makeResult() | |
unittest.registerResult(result) | |
result.failfast = self.failfast | |
result.buffer = self.buffer | |
result.tb_locals = self.tb_locals | |
with warnings.catch_warnings(): | |
if self.warnings: | |
# if self.warnings is set, use it to filter all the warnings | |
warnings.simplefilter(self.warnings) | |
# if the filter is 'default' or 'always', special-case the | |
# warnings from the deprecated unittest methods to show them | |
# no more than once per module, because they can be fairly | |
# noisy. The -Wd and -Wa flags can be used to bypass this | |
# only when self.warnings is None. | |
if self.warnings in ['default', 'always']: | |
warnings.filterwarnings('module', | |
category=DeprecationWarning, | |
message=r'Please use assert\w+ instead.') | |
startTestRun = getattr(result, 'startTestRun', None) | |
if startTestRun is not None: | |
startTestRun() | |
try: | |
test(result) | |
except KeyboardInterrupt: | |
raise | |
finally: | |
stopTestRun = getattr(result, 'stopTestRun', None) | |
if stopTestRun is not None: | |
stopTestRun() | |
unittest.removeResult(result) | |
return result | |
def print_results(runtime, total, skipped, errors, failures): | |
separator1 = '=' * 70 | |
separator2 = '-' * 70 | |
info = [] | |
if errors: | |
info.append('errors=%s' % len(errors)) | |
for test, err in errors: | |
name = test.rsplit('.', 1)[-1] | |
print() | |
print(separator1) | |
print('ERROR: %s (%s)' % (name, test)) | |
print(separator2) | |
print(err) | |
if failures: | |
info.append('failures=%s' % len(failures)) | |
for test, err in failures: | |
name = test.rsplit('.', 1)[-1] | |
print() | |
print(separator1) | |
print('FAIL: %s (%s)' % (name, test)) | |
print(separator2) | |
print(err) | |
if skipped: | |
info.append('skipped=%s' % len(skipped)) | |
print(separator2) | |
print("Ran %d test%s in %.3fs" % (total, total != 1 and "s" or "", runtime)) | |
print() | |
result = 'FAILED' if errors or failures else 'OK' | |
if info: | |
print('%s (%s)' % (result, ', '.join(info))) | |
else: | |
print(result) | |
@profile | |
def run(testdirs, filefilters, verbosity=1, failfast=False, buffer=False): | |
errors = [] | |
failures = [] | |
skipped = [] | |
total = 0 | |
test_files = [] | |
for subdir in testdirs: | |
if filefilters: | |
for filt in filefilters: | |
glob_path = '%s/**/tests/*%s*' % (subdir, filt) | |
test_files.extend(f for f in glob.glob(glob_path, recursive=True)) | |
else: | |
glob_path = '%s/**/tests/test*.py' % subdir | |
test_files.extend(f for f in glob.glob(glob_path, recursive=True)) | |
tests_started = time.perf_counter() | |
for test_file_path in test_files: | |
mname = test_file_path[:-3].replace(os.sep, '.') | |
module = importlib.import_module(mname) | |
# See if this module has any tests | |
tests = {} | |
for attr in dir(module): | |
testcls = getattr(module, attr) | |
if isinstance(testcls, type) and \ | |
issubclass(testcls, unittest.TestCase) and \ | |
not hasattr(testcls, '__unittest_skip__'): | |
for testname in dir(testcls): | |
if testname.startswith('test_') and \ | |
callable(getattr(testcls, testname)): | |
if testcls not in tests: | |
tests[testcls] = [] | |
tests[testcls].append(testname) | |
# Run tests for the module found | |
if tests: | |
if verbosity == 2: | |
print('\n######## RUNNING TESTS in %s ########\n' % mname) | |
module_results = run_module_tests(module, tests, verbosity, failfast, buffer) | |
errors.extend(module_results[0]) | |
failures.extend(module_results[1]) | |
skipped.extend(module_results[2]) | |
total += module_results[3] | |
elif verbosity == 2: | |
print('\n######## NO TESTS FOUND FOR %s ########' % mname) | |
# explicitly delete the imported module | |
del module | |
gc.collect() | |
tests_ended = time.perf_counter() | |
print_results(tests_ended-tests_started, total, skipped, errors, failures) | |
sys.exit(len(errors) + len(failures)) | |
def find_test_subdirs(): | |
paths = glob.glob('**/tests', recursive=True) | |
return list(set(p.split(os.sep, 1)[0] for p in paths if 'site-packages' not in p)) | |
def main(): | |
dirs = find_test_subdirs() | |
parser = argparse.ArgumentParser(description='Unit Test Runner') | |
parser.add_argument('-d', '--testdirs', nargs='*', choices=dirs, | |
help='One or more subdirectories to run tests from') | |
# options to act similar to the normal "unittest" CLI | |
parser.add_argument('-v', '--verbosity', | |
type=int, default=1, nargs='?', const=2, choices=[0,1,2], | |
help='Verbosity of tests: 1 (default) print ".|E|F" for each test, ' | |
'2 (max) print test name, 0 print only errors and failures') | |
parser.add_argument('-f', '--failfast', default=False, action='store_true', | |
help='halt on the first failure') | |
parser.add_argument('-b', '--buffer', default=False, action='store_true', | |
help='buffer stdout and only print if a test fails') | |
# Any extra arguments are assumed to be file filters | |
args, file_filters = parser.parse_known_args() | |
if args.testdirs: | |
run(args.testdirs, file_filters, args.verbosity, args.failfast, args.buffer) | |
else: | |
run(dirs, file_filters, args.verbosity, args.failfast, args.buffer) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment