Skip to content

Instantly share code, notes, and snippets.

@wickman
Created September 12, 2014 20:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wickman/5f3fb184f25f6931445c to your computer and use it in GitHub Desktop.
Save wickman/5f3fb184f25f6931445c to your computer and use it in GitHub Desktop.
custom pants pytest runner
diff --git a/src/python/twitter/pants/BUILD b/src/python/twitter/pants/BUILD
index c4cfaac..656392d 100644
--- a/src/python/twitter/pants/BUILD
+++ b/src/python/twitter/pants/BUILD
@@ -27,13 +27,13 @@ python_library(
name='pants-deps',
dependencies=[
pants('3rdparty/python:ansicolors'),
+ pants('3rdparty/python:coverage'),
pants('3rdparty/python:elementtree'),
pants('3rdparty/python:mako'),
pants('3rdparty/python:markdown'),
pants('3rdparty/python:psutil'),
pants('3rdparty/python:pygments'),
pants('3rdparty/python:pytest'),
- pants('3rdparty/python:pytest-cov'),
pants('3rdparty/python:requests'),
python_requirement('pylint==0.25.1', version_filter=pylint_build_filter),
]
@@ -44,6 +44,7 @@ python_library(
dependencies=[
pants(':pants-deps'),
pants('src/python/twitter/common/collections'),
+ pants('src/python/twitter/common/concurrent'),
pants('src/python/twitter/common/config'),
pants('src/python/twitter/common/confluence'),
pants('src/python/twitter/common/contextutil'),
@@ -53,6 +54,7 @@ python_library(
pants('src/python/twitter/common/log'),
pants('src/python/twitter/common/process'),
pants('src/python/twitter/common/python'),
+ pants('src/python/twitter/common/recordio'),
pants('src/python/twitter/common/quantity'),
pants('src/python/twitter/common/util'),
pants('src/python/twitter/thrift/text'),
@@ -77,7 +79,7 @@ python_library(
# TODO(wickman) This version should be automatically pulled in from twitter.pants.version
provides=setup_py(
name='twitter.pants',
- version='0.0.12',
+ version='0.0.14',
description="Twitter's pants build tool.",
url='https://github.com/twitter/commons',
license='Apache License, Version 2.0',
diff --git a/src/python/twitter/pants/base/address.py b/src/python/twitter/pants/base/address.py
index 8d4a192..25178a4 100644
--- a/src/python/twitter/pants/base/address.py
+++ b/src/python/twitter/pants/base/address.py
@@ -93,5 +93,15 @@ class Address(object):
def __ne__(self, other):
return not self.__eq__(other)
+ def compact_repr(self, source_root=None):
+ def _repr():
+ if os.path.basename(self.buildfile.parent_path) == self.target_name:
+ return os.path.dirname(self.buildfile.relpath)
+ else:
+ return '%s:%s' % (os.path.dirname(self.buildfile.relpath), self.target_name)
+ if source_root:
+ return os.path.relpath(_repr(), source_root)
+ return _repr()
+
def __repr__(self):
return "%s:%s" % (self.buildfile, self.target_name)
diff --git a/src/python/twitter/pants/commands/build.py b/src/python/twitter/pants/commands/build.py
index b32363c..b678c82 100644
--- a/src/python/twitter/pants/commands/build.py
+++ b/src/python/twitter/pants/commands/build.py
@@ -95,6 +95,7 @@ class Build(Command):
if not target:
self.error("Target %s does not exist" % address)
+
self.targets.update(tgt for tgt in target.resolve() if is_concrete(tgt))
def debug(self, message):
diff --git a/src/python/twitter/pants/python/pytest_runner.py b/src/python/twitter/pants/python/pytest_runner.py
new file mode 100644
index 0000000..62a56a4
--- /dev/null
+++ b/src/python/twitter/pants/python/pytest_runner.py
@@ -0,0 +1,169 @@
+"""
+Pants Python pytest driver. For more information about the pytest plugin interface, see:
+http://pytest.org/latest/plugins.html#well-specified-hooks
+
+This plugin provides a recordio interface to test results as well as a code coverage driver.
+"""
+
+import json
+import os
+import struct
+
+from pytest import Function
+
+
+_RECORD_STREAM = None
+_COV = None
+
+
+class PythonTestRecord(object):
+ def __init__(self, json_dict):
+ pass
+
+
+class PythonTestHeader(PythonTestRecord):
+ def __init__(self, json_dict):
+ self.filename, self.functions = map(json_dict.get, ('filename', 'tests'))
+ super(PythonTestHeader, self).__init__(json_dict)
+
+
+class PythonTestResult(PythonTestRecord):
+ def __init__(self, json_dict):
+ self.function, self.filename, self.lineno, self.outcome = map(
+ json_dict.get, ('function', 'filename', 'lineno', 'outcome'))
+ super(PythonTestResult, self).__init__(json_dict)
+
+
+class PythonTestFooter(PythonTestRecord):
+ pass
+
+
+def write_record(fp, json_dict):
+ """Write one Pants pytest driver record (json_dict) to the stream (fp)."""
+ json_blob = json.dumps(json_dict).encode('utf-8')
+ header = struct.pack('>L', len(json_blob))
+ fp.write(header + json_blob)
+ fp.flush()
+
+
+def read_record(fp):
+ """Read one record Pants pytest driver record from the stream fp.
+
+ The pytest_runner and test_builder communicate via a simple recordio-based mechanism,
+ framed in the following fashion:
+
+ [4 byte int "len1"][string blob of length "len1"]
+ [4 byte int "len2"][string blob of length "len2"]
+ ...
+
+ The return value will be an object of type PythonTestRecord or None if nothing was processed.
+ There are three distinct subtypes:
+ - PythonTestHeader*
+ - PythonTestResult*
+ - PythonTestFooter
+
+ PythonTestResult has the following attributes:
+ .function
+ .filename
+ .lineno
+ .outcome (one of 'passed', 'failed', 'skipped')
+
+ Messages types with '*' may be delivered multiple times. Only one footer will be delivered
+ and signifies that a test run has completed.
+ """
+ header = fp.read(4)
+ if len(header) == 0:
+ fp.seek(fp.tell())
+ return
+ blob_len = struct.unpack('>L', header)[0]
+ read_blob = fp.read(blob_len)
+ if len(read_blob) != blob_len:
+ raise ValueError('Failed to read stream!')
+ read_blob = json.loads(read_blob)
+ if read_blob['type'] == 'header':
+ return PythonTestHeader(read_blob)
+ elif read_blob['type'] == 'footer':
+ return PythonTestFooter(read_blob)
+ elif read_blob['type'] == 'test':
+ return PythonTestResult(read_blob)
+ raise ValueError('Unknown record type encountered: %s' % read_blob.get('type'))
+
+
+def pytest_addoption(parser):
+ group = parser.getgroup("terminal reporting")
+ group.addoption('--pants_test_recordio',
+ action="store",
+ dest="pants_test_recordio",
+ metavar="PATH",
+ default=None,
+ help="Create a pants test recordio at this path.")
+ group.addoption('--pants_test_coverage_root',
+ action="store",
+ dest="pants_test_coverage_root",
+ metavar="PATH",
+ default=None,
+ help="If specified, enable coverage at this root directory.")
+
+
+def pytest_configure(config):
+ global _RECORD_STREAM
+ global _COV
+ if config.option.pants_test_recordio:
+ _RECORD_STREAM = open(config.option.pants_test_recordio, 'ab+')
+ if config.option.pants_test_coverage_root:
+ root = config.option.pants_test_coverage_root
+ try:
+ from coverage import coverage
+ except ImportError:
+ print('Failed to import coverage. Coverage disabled.')
+ return
+ _COV = coverage(
+ config_file=os.path.join(root, 'config'),
+ auto_data=True,
+ )
+ _COV.start()
+
+
+def translate_function_names(report):
+ for result in report.result:
+ if not isinstance(result, Function):
+ continue
+ nodeid = report.nodeid.split('::')
+ if len(nodeid) == 1:
+ yield result.name
+ else:
+ yield '%s.%s' % (nodeid[1], result.name)
+
+
+def pytest_collectreport(report):
+ write_record(_RECORD_STREAM, dict(
+ type='header',
+ filename=report.nodeid.split('::')[0],
+ tests=list(translate_function_names(report))
+ ))
+
+
+def pytest_runtest_logreport(report):
+ if _RECORD_STREAM is None or report.when != 'call':
+ return
+ _, lineno, function = report.location
+ filename = report.fspath
+ outcome = report.outcome
+ write_record(_RECORD_STREAM, dict(
+ type='test',
+ function=function,
+ filename=filename,
+ lineno=lineno,
+ outcome=outcome,
+ ))
+
+
+def pytest_unconfigure(config):
+ if _RECORD_STREAM is None:
+ return
+ if _COV:
+ _COV.stop()
+ write_record(_RECORD_STREAM, dict(
+ type='footer'
+ ))
+ _RECORD_STREAM.close()
diff --git a/src/python/twitter/pants/python/python_chroot.py b/src/python/twitter/pants/python/python_chroot.py
index d566ec0..4588a11 100644
--- a/src/python/twitter/pants/python/python_chroot.py
+++ b/src/python/twitter/pants/python/python_chroot.py
@@ -61,16 +61,23 @@ class PythonChroot(object):
def __init__(self, target):
Exception.__init__(self, "Not a valid Python dependency! Found: %s" % target)
- def __init__(self, target, root_dir, extra_targets=None, builder=None, interpreter=None,
- conn_timeout=None):
+ def __init__(self, target,
+ root_dir,
+ extra_targets=None,
+ builder=None,
+ interpreter=None,
+ platforms=None,
+ conn_timeout=None):
self._config = Config.load()
self._target = target
self._root = root_dir
self._interpreter = interpreter or PythonInterpreter.get()
- self._cache = BuildCache(os.path.join(self._config.get('python-setup', 'artifact_cache'),
- '%s' % self._interpreter.identity))
+ self._cache = BuildCache(
+ os.path.join(self._config.get('python-setup', 'artifact_cache'),
+ '%s' % self._interpreter.identity))
self._extra_targets = list(extra_targets) if extra_targets is not None else []
- self._resolver = MultiResolver(self._config, target, conn_timeout=conn_timeout)
+ self._resolver = MultiResolver(
+ self._config, target, platforms=platforms, conn_timeout=conn_timeout)
self._builder = builder or PEXBuilder(tempfile.mkdtemp(), interpreter=self._interpreter)
def __del__(self):
@@ -165,15 +172,15 @@ class PythonChroot(object):
for antlr in targets['antlrs']:
generated_reqs.add(self._generate_antlr_requirement(antlr))
+ reqs_to_build = OrderedSet()
targets['reqs'] |= generated_reqs
for req in targets['reqs']:
if not req.should_build(self._interpreter.python, Platform.current()):
self.debug('Skipping %s based upon version filter' % req)
continue
+ reqs_to_build.add(req)
self._dump_requirement(req._requirement, False, req._repository)
- reqs_to_build = (req for req in targets['reqs']
- if req.should_build(self._interpreter.python, Platform.current()))
for dist in self._resolver.resolve(reqs_to_build, interpreter=self._interpreter):
self._dump_distribution(dist)
diff --git a/src/python/twitter/pants/python/resolver.py b/src/python/twitter/pants/python/resolver.py
index 0a9b604..dd56a31 100644
--- a/src/python/twitter/pants/python/resolver.py
+++ b/src/python/twitter/pants/python/resolver.py
@@ -40,8 +40,9 @@ class MultiResolver(ResolverBase):
return Crawler(cache=config.get('python-setup', 'download_cache'),
conn_timeout=conn_timeout)
- def __init__(self, config, target, conn_timeout=None):
- platforms = config.getlist('python-setup', 'platforms', ['current'])
+ def __init__(self, config, target, platforms=None, conn_timeout=None):
+ if platforms is None:
+ platforms = config.getlist('python-setup', 'platforms', ['current'])
if isinstance(target, PythonBinary) and target.platforms:
platforms = target.platforms
self._install_cache = config.get('python-setup', 'install_cache')
diff --git a/src/python/twitter/pants/python/sdist_builder.py b/src/python/twitter/pants/python/sdist_builder.py
index f35f693..4472557 100644
--- a/src/python/twitter/pants/python/sdist_builder.py
+++ b/src/python/twitter/pants/python/sdist_builder.py
@@ -21,7 +21,6 @@ import os
import subprocess
import sys
-from twitter.common.contextutil import pushd
from twitter.common.python.installer import Packager
diff --git a/src/python/twitter/pants/python/test_builder.py b/src/python/twitter/pants/python/test_builder.py
index 6f7a019..aba8840 100644
--- a/src/python/twitter/pants/python/test_builder.py
+++ b/src/python/twitter/pants/python/test_builder.py
@@ -16,126 +16,247 @@
from __future__ import print_function
-__author__ = 'Brian Wickman'
-
try:
import configparser
except ImportError:
import ConfigParser as configparser
+from contextlib import contextmanager
import errno
+import json
import os
-import time
+import pkgutil
+import shutil
import signal
+import subprocess
import sys
+from textwrap import dedent
+import threading
+import time
from twitter.common.collections import OrderedSet
-from twitter.common.contextutil import temporary_file
-from twitter.common.dirutil import safe_mkdir
+from twitter.common.contextutil import temporary_dir
+from twitter.common.dirutil import safe_mkdir, safe_open
from twitter.common.lang import Compatibility
-from twitter.common.quantity import Amount, Time
from twitter.common.python.interpreter import PythonInterpreter
from twitter.common.python.pex import PEX
from twitter.common.python.pex_builder import PEXBuilder
-
+from twitter.common.quantity import Amount, Time
from twitter.pants.base import Config, ParseContext
-from twitter.pants.python.python_chroot import PythonChroot
from twitter.pants.targets import (
+ PythonLibrary,
PythonRequirement,
- PythonTarget,
PythonTestSuite,
- PythonTests)
+ PythonTests,
+)
+try:
+ from Queue import Queue, Empty
+except ImportError:
+ from queue import Queue, Empty
-class PythonTestResult(object):
- @staticmethod
- def timeout():
- return PythonTestResult('TIMEOUT')
+from .python_chroot import PythonChroot
+from .pytest_runner import (
+ read_record as read_pytest_record,
+ PythonTestFooter,
+ PythonTestResult,
+)
- @staticmethod
- def exception():
- return PythonTestResult('EXCEPTION')
+from pkg_resources import DistributionNotFound
- @staticmethod
- def rc(value):
- return PythonTestResult('SUCCESS' if value == 0 else 'FAILURE',
- rc=value)
- def __init__(self, msg, rc=None):
- self._rc = rc
- self._msg = msg
+class Display(object):
+ try:
+ import colors
+ except ImportError:
+ colors = None
- def __str__(self):
- return self._msg
+ DEFAULT_WIDTH = 100
+
+ @classmethod
+ def render(cls, string, color):
+ if cls.colors is None:
+ return string
+ color_fn = getattr(cls.colors, color, None)
+ if not callable(color_fn):
+ return string
+ return color_fn(string)
+
+ @classmethod
+ def isatty(cls, filelike):
+ if not hasattr(filelike, 'fileno'):
+ return False
+ return os.isatty(filelike.fileno()) and filelike.fileno() in (
+ sys.stdout.fileno(), sys.stderr.fileno())
+
+ @classmethod
+ def length(cls, string):
+ """calculate printable length of the string"""
+ if cls.colors is None:
+ return len(str(string))
+ return len(cls.colors.strip_color(str(string)))
+
+ def __init__(self, filelike, width=DEFAULT_WIDTH):
+ self._filelike = filelike
+ self._isatty = self.isatty(filelike)
+ self._colorful = self._isatty
+ self._width = width
+ self._width_duration = 0.5
+ self._width_expiration = time.time()
@property
- def success(self):
- return self._rc == 0
+ def width(self):
+ """Return a guess for the terminal width."""
+ if not self._isatty:
+ return self._width
+ now = time.time()
+ if now > self._width_expiration:
+ self._width_expiration = now + self._width_duration
+ try:
+ self._width = int(os.popen('stty size', 'r').read().split()[1])
+ except (TypeError, ValueError):
+ pass
+ return self._width
+
+ def center(self, string, filler='='):
+ filler_width = self.width - self.length(string)
+ if not self._colorful and self.colors:
+ string = self.colors.strip_color(string)
+ print('%s%s%s' % (filler * (filler_width // 2), string,
+ (filler * (filler_width // 2 + filler_width % 2))), file=self._filelike)
+
+ def write(self, string):
+ if not self._colorful and self.colors:
+ string = self.colors.strip_color(string)
+ print(string, file=self._filelike)
+
+ def left_right(self, left_string, right_string):
+ width = self.width
+ if not self._colorful and self.colors:
+ left_string = self.colors.strip_color(left_string)
+ right_string = self.colors.strip_color(right_string)
+ left_length = self.length(left_string)
+ right_length = self.length(right_string)
+ leftover = max(width - left_length - right_length, 1)
+ dots_length = 15 - right_length
+ dots_string = leftover * ' ' + dots_length * '.' + ' '
+ print('%s%s%s' % (left_string, dots_string[-leftover:], right_string),
+ file=self._filelike)
+
+
+class PythonTestOutcome(object):
+ SUCCESS = 0
+ FAILURE = 1
+ SKIPPED = 2
+ TIMEOUT = 3
+ CANCELLED = 4
+ EXCEPTION = 5
+
+ NAME_MAP = {
+ SUCCESS: 'SUCCESS',
+ FAILURE: 'FAILURE',
+ SKIPPED: 'SKIPPED',
+ TIMEOUT: 'TIMEOUT',
+ CANCELLED: 'CANCELLED',
+ EXCEPTION: 'EXCEPTION',
+ }
+
+ COLOR_MAP = {
+ SUCCESS: 'green',
+ FAILURE: 'red',
+ SKIPPED: 'yellow',
+ TIMEOUT: 'cyan',
+ CANCELLED: 'yellow',
+ EXCEPTION: 'red',
+ }
+ @classmethod
+ def timeout(cls):
+ return cls(cls.TIMEOUT)
-DEFAULT_COVERAGE_CONFIG = """
-[run]
-branch = True
-timid = True
+ @classmethod
+ def exception(cls):
+ return cls(cls.EXCEPTION)
-[report]
-exclude_lines =
- def __repr__
- raise NotImplementedError
+ @classmethod
+ def cancelled(cls):
+ return cls(cls.CANCELLED)
-ignore_errors = True
-"""
+ @classmethod
+ def success(cls):
+ return cls(cls.SUCCESS)
-def generate_coverage_config(target):
- cp = configparser.ConfigParser()
- cp.readfp(Compatibility.StringIO(DEFAULT_COVERAGE_CONFIG))
- cp.add_section('html')
- target_dir = os.path.join(Config.load().getdefault('pants_distdir'), 'coverage',
- os.path.dirname(target.address.buildfile.relpath), target.name)
- safe_mkdir(target_dir)
- cp.set('html', 'directory', target_dir)
- return cp
+ @classmethod
+ def failure(cls):
+ return cls(cls.FAILURE)
+ @classmethod
+ def skipped(cls):
+ return cls(cls.SKIPPED)
-class PythonTestBuilder(object):
- class InvalidDependencyException(Exception): pass
- class ChrootBuildingException(Exception): pass
+ def __init__(self, type_):
+ if type_ not in self.NAME_MAP:
+ raise ValueError('Unknown type %s' % type_)
+ self._type = type_
- TESTING_TARGETS = None
+ def __str__(self):
+ return Display.render(self.NAME_MAP[self._type], self.COLOR_MAP[self._type])
- # TODO(wickman) Expose these as configuratable parameters
- TEST_TIMEOUT = Amount(2, Time.MINUTES)
- TEST_POLL_PERIOD = Amount(100, Time.MILLISECONDS)
+ @property
+ def succeeded(self):
+ return self._type in (self.SUCCESS, self.SKIPPED)
- def __init__(self, targets, args, root_dir, interpreter=None, conn_timeout=None):
- self.targets = targets
- self.args = args
- self.root_dir = root_dir
- self.interpreter = interpreter or PythonInterpreter.get()
- self.successes = {}
- self._conn_timeout = conn_timeout
+ @property
+ def failed(self):
+ return self._type in (self.FAILURE, self.EXCEPTION, self.TIMEOUT)
+
+
+DEFAULT_COVERAGE_CONFIG = dedent("""
+ [run]
+ branch = True
+ timid = True
+
+ [report]
+ exclude_lines =
+ def __repr__
+ raise NotImplementedError
+
+ ignore_errors = True
+""")
+
+
+@contextmanager
+def coverage_config(path):
+ with safe_open(path, 'wb') as fp:
+ cp = configparser.ConfigParser()
+ cp.readfp(Compatibility.StringIO(DEFAULT_COVERAGE_CONFIG))
+ yield cp
+ cp.write(fp)
- def run(self):
- self.successes = {}
- rv = self._run_tests(self.targets)
- for target in sorted(self.successes):
- print('%-80s.....%10s' % (target, self.successes[target]))
- return 0 if rv.success else 1
+
+class PythonTestChroot(object):
+ """
+ A wrapper around PythonChroot to encapsulate test-specific dependencies (e.g. pytest,
+ coverage) and drivers (e.g. pytest_runner.)
+ """
+ class Error(Exception): pass
+ class ChrootBuildingException(Error): pass
+
+ TESTING_TARGETS = {}
@classmethod
- def generate_test_targets(cls):
- if cls.TESTING_TARGETS is None:
+ def generate_test_targets(cls, interpreter):
+ if interpreter.version_string not in cls.TESTING_TARGETS:
with ParseContext.temp():
- cls.TESTING_TARGETS = [
+ cls.TESTING_TARGETS[interpreter.version_string] = [
PythonRequirement('pytest'),
- PythonRequirement('pytest-cov'),
- PythonRequirement('coverage==3.6b1'),
+ PythonRequirement('coverage'),
PythonRequirement('unittest2', version_filter=lambda py, pl: py.startswith('2')),
PythonRequirement('unittest2py3k', version_filter=lambda py, pl: py.startswith('3'))
]
- return cls.TESTING_TARGETS
+ return cls.TESTING_TARGETS[interpreter.version_string]
- @staticmethod
- def generate_junit_args(target):
+ @classmethod
+ def generate_junit_args(cls, target):
args = []
xml_base = os.getenv('JUNIT_XML_BASE')
if xml_base:
@@ -146,94 +267,345 @@ class PythonTestBuilder(object):
os.makedirs(os.path.dirname(xml_path))
except OSError as e:
if e.errno != errno.EEXIST:
- raise PythonTestBuilder.ChrootBuildingException(
+ raise cls.ChrootBuildingException(
"Unable to establish JUnit target: %s! %s" % (target, e))
args.append('--junitxml=%s' % xml_path)
return args
- @staticmethod
- def cov_setup(target, chroot):
- cp = generate_coverage_config(target)
- with temporary_file(cleanup=False) as fp:
- cp.write(fp)
- filename = fp.name
- if target.coverage:
- source = target.coverage
+ COVERAGE_ROOT = '.coverage'
+
+ @classmethod
+ def coverage_root(cls, builder):
+ return os.path.join(builder.path(), cls.COVERAGE_ROOT)
+
+ @classmethod
+ def write_coverage_setup(cls, target, builder):
+ coverage_root = cls.coverage_root(builder)
+ with coverage_config(os.path.join(coverage_root, 'config')) as cp:
+ if target.coverage:
+ modules = target.coverage
+ else:
+ # This technically makes the assumption that tests/python/<target> will be testing
+ # src/python/<target>. To change to honest measurements, do target.walk() here insead,
+ # however this results in very useless and noisy coverage reports.
+ modules = set(os.path.dirname(source).replace(os.sep, '.') for source in target.sources)
+
+ def as_list(iterable):
+ return '\n' + '\n'.join(iterable)
+
+ cp.set('run', 'source', as_list(modules))
+ cp.set('run', 'data_file', os.path.join(coverage_root, 'data'))
+ cp.set('run', 'parallel', True)
+ return coverage_root
+
+ @classmethod
+ def write_coverage_sitecustomize(cls, builder):
+ coverage_root = cls.coverage_root(builder)
+ with safe_open(os.path.join(coverage_root, 'sitecustomize.py'), 'wb') as fp:
+ fp.write(dedent('''
+ import os
+ try:
+ from coverage import coverage
+ cov = coverage(config_file=os.path.join(%r, 'config'), auto_data=True)
+ cov.start()
+ except ImportError:
+ pass
+ ''' % coverage_root))
+ return coverage_root
+
+ @classmethod
+ def write_pytest_runner(cls, builder):
+ builder.chroot().write(pkgutil.get_data(__name__, 'pytest_runner.py'), 'pytest_runner.py')
+
+ def __init__(self,
+ target,
+ root_dir,
+ interpreter=None,
+ coverage=False,
+ conn_timeout=None,
+ redirect=False,
+ timeout=None):
+ """
+ :param target: The PythonTests target to chroot
+ :param root_dir: The build root of the repository.
+ :keyword interpreter: If None, use the current interpreter, otherwise suppled
+ PythonInterpreter.
+ :keyword coverage: If True, enable coverage reporting for this test.
+ :keyword conn_timeout: Use this connection timeout for all downstream resolvers.
+ :keyword redirect: If True, redirect stdout to a file. Otherwise map to sys.stdout
+ :keyword timeout: The Amount of Time this test has to complete before being cancelled.
+ """
+ if not isinstance(target, PythonTests):
+ raise TypeError('PythonTestChroot takes a PythonTests object.')
+ self.target = target
+ self.root_dir = root_dir
+ self._builder = self._chroot = None
+ self.coverage = coverage
+ self.interpreter = interpreter or PythonInterpreter.get()
+ self._conn_timeout = conn_timeout
+ self._built = False
+ self._recordio = None
+ self.results = set()
+ self.acknowledged_results = set()
+ self.state = None
+ self._processed_footer = False
+ self._redirect = redirect
+ self._expiration = None
+ self._timeout = timeout or Amount(1000, Time.HOURS)
+ # Pants is not threadsafe -- calling self.target.sources actually
+ # changes the cwd if it's never been called before. I spent almost 5
+ # hours tracking down nondeterministic bugs caused by this. Moving from
+ # .popen to .__init__ fixes it, since the builds are serialized, but
+ # this seems brittle.
+ self._sources = [os.path.abspath(os.path.join(self.target.target_base, source))
+ for source in self.target.sources]
+
+ def build(self):
+ builder = PEXBuilder()
+ builder.info.entry_point = 'pytest'
+ builder.info.ignore_errors = self.target._soft_dependencies
+ self._chroot = PythonChroot(
+ self.target,
+ self.root_dir,
+ interpreter=self.interpreter,
+ platforms=('current',), # only build tests for the current platform
+ extra_targets=self.generate_test_targets(PythonInterpreter.get()),
+ builder=builder,
+ conn_timeout=self._conn_timeout)
+ self._builder = self._chroot.dump()
+ self.write_pytest_runner(self._builder)
+ self._builder.freeze()
+ self._recordio_path = os.path.join(self._builder.path(), 'tests.recordio')
+ self._recordio = open(self._recordio_path, 'wb+')
+ self._built = True
+ self._stdout = None
+
+ def pytest_runner_args(self):
+ return ['-p', 'pytest_runner', '--pants_test_recordio', self._recordio_path]
+
+ def popen(self, args):
+ if not self._built:
+ self.build()
+ test_args = self.generate_junit_args(self.target)
+ test_args.extend(self.pytest_runner_args())
+ if self._redirect:
+ # If we are redirecting for the purpose of the parallel test runner, make output as quiet
+ # as possible but capture stdout/stderr of the processes so that they can be printed.
+ test_args.append('-qs')
+ test_args.extend(args)
+ env = os.environ.copy()
+ if self.coverage:
+ test_args.append(
+ '--pants_test_coverage=%s' % self.write_coverage_setup(self.target, self._builder))
+ coverage_root = self.write_coverage_sitecustomize(self._builder)
+ pythonpath = [coverage_root] + list(filter(None, env.get('PYTHONPATH', '').split(':')))
+ env['PYTHONPATH'] = ':'.join(pythonpath)
+ if self._redirect:
+ self._stdout = open(os.path.join(self._builder.path(), 'stdout'), 'w')
+ pex = PEX(self._builder.path(), interpreter=self.interpreter)
+ self._po = pex.run(
+ args=test_args + self._sources,
+ blocking=False,
+ setsid=True,
+ stdout=self._stdout,
+ env=env,
+ stderr=subprocess.STDOUT)
+ self._expiration = time.time() + self._timeout.as_(Time.SECONDS)
+ return self._po
+
+ @property
+ def output(self):
+ if not self._redirect:
+ return ''
+ with open(os.path.join(self._builder.path(), 'stdout')) as fp:
+ return fp.read()
+
+ @property
+ def finished(self):
+ return self.results == self.acknowledged_results and self._processed_footer
+
+ def _process_record(self, record):
+ if isinstance(record, PythonTestResult):
+ self.results.add(record)
+ return record
+ elif isinstance(record, PythonTestFooter):
+ self._processed_footer = True
+ if self._stdout:
+ self._stdout.close()
+ self._po.wait()
+
+ def read(self):
+ if not self._processed_footer and time.time() > self._expiration:
+ self.state = 'timeout'
+ self.kill()
+ return
+ record = read_pytest_record(self._recordio)
+ if record is not None:
+ return self._process_record(record)
+
+ def acknowledge(self, result):
+ self.acknowledged_results.add(result)
+
+ def mark_skipped(self):
+ self.kill(state='skipped')
+
+ @property
+ def outcome(self):
+ if not self.finished and self.state is None:
+ return None
+ if self.state == 'timeout':
+ return PythonTestOutcome.timeout()
+ elif self.state == 'skipped':
+ return PythonTestOutcome.skipped()
+ elif self.state == 'cancelled':
+ return PythonTestOutcome.cancelled()
else:
- # This technically makes the assumption that tests/python/<target> will be testing
- # src/python/<target>. To change to honest measurements, do target.walk() here instead,
- # however this results in very useless and noisy coverage reports.
- source = set(os.path.dirname(source).replace(os.sep, '.') for source in target.sources)
- args = ['-p', 'pytest_cov',
- '--cov-config', filename,
- '--cov-report', 'html',
- '--cov-report', 'term']
- for module in source:
- args.extend(['--cov', module])
- return filename, args
-
- @staticmethod
- def wait_on(popen, timeout=TEST_TIMEOUT):
- total_wait = Amount(0, Time.SECONDS)
- while total_wait < timeout:
- rc = popen.poll()
- if rc is not None:
- return PythonTestResult.rc(rc)
- total_wait += PythonTestBuilder.TEST_POLL_PERIOD
- time.sleep(PythonTestBuilder.TEST_POLL_PERIOD.as_(Time.SECONDS))
- popen.kill()
- return PythonTestResult.timeout()
-
- def _run_python_test(self, target):
- po = None
- rv = PythonTestResult.exception()
- coverage_rc = None
- coverage_enabled = 'PANTS_PY_COVERAGE' in os.environ
+ success = all(result.outcome in ('skipped', 'passed') for result in self.results)
+ return PythonTestOutcome.success() if success else PythonTestOutcome.failure()
+
+ def iter_coverage_data(self):
+ """Yield source_root, data file tuples from this chroot."""
+ if self.coverage:
+ coverage_root = os.path.join(self._builder.path(), self.COVERAGE_ROOT)
+ for path in os.listdir(coverage_root):
+ if path.startswith('data'):
+ yield (self._builder.path(), os.path.join(coverage_root, path))
+
+ def _kill(self):
+ if self._po:
+ self._po.poll()
+ if self._po and self._po.returncode != 0:
+ try:
+ os.killpg(self._po.pid, signal.SIGTERM)
+ except OSError as e:
+ print('Failed to kill process group %s: %s' % (self._po.pid, e))
- try:
- builder = PEXBuilder(interpreter=self.interpreter)
- builder.info.entry_point = target.entry_point
- builder.info.ignore_errors = target._soft_dependencies
- chroot = PythonChroot(
- target,
- self.root_dir,
- extra_targets=self.generate_test_targets(),
- builder=builder,
- interpreter=self.interpreter,
- conn_timeout=self._conn_timeout)
- builder = chroot.dump()
- builder.freeze()
- test_args = PythonTestBuilder.generate_junit_args(target)
- test_args.extend(self.args)
- if coverage_enabled:
- coverage_rc, args = self.cov_setup(target, builder.chroot())
- test_args.extend(args)
- sources = [os.path.join(target.target_base, source) for source in target.sources]
- po = PEX(builder.path(), interpreter=self.interpreter).run(
- args=test_args + sources, blocking=False, setsid=True)
- # TODO(wickman) If coverage is enabled, write an intermediate .html that points to
- # each of the coverage reports generated and webbrowser.open to that page.
- rv = PythonTestBuilder.wait_on(po, timeout=target.timeout)
- except Exception as e:
- import traceback
- print('Failed to run test!', file=sys.stderr)
- traceback.print_exc()
- rv = PythonTestResult.exception()
- finally:
- if coverage_rc:
- os.unlink(coverage_rc)
- if po and po.returncode != 0:
- try:
- os.killpg(po.pid, signal.SIGTERM)
- except OSError as e:
- if e.errno == errno.EPERM:
- print("Unable to kill process group: %d" % po.pid)
- elif e.errno != errno.ESRCH:
- rv = PythonTestResult.exception()
- self.successes[target._create_id()] = rv
- return rv
-
- def _run_python_test_suite(self, target, fail_hard=True):
+ def kill(self, state='cancelled'):
+ if not self.finished:
+ self.state = state
+ self._kill()
+
+
+class PythonTestBuilder(object):
+ class Error(Exception): pass
+ class InvalidDependencyException(Error): pass
+
+ # TODO(wickman) Expose these as configurable parameters
+ TEST_TIMEOUT = Amount(2, Time.MINUTES)
+ TEST_POLL_PERIOD = Amount(100, Time.MILLISECONDS)
+
+ def __init__(self, targets, args, root_dir, interpreter=None, conn_timeout=None):
+ self.targets = targets
+ self.args = args
+ self.root_dir = root_dir
+ self.successes = {}
+ self._display = Display(sys.stdout)
+ self.interpreter = interpreter or PythonInterpreter.get()
+ self.parallelism = max(1, int(os.environ.get('PANTS_PY_PARALLELISM', 1)))
+ self.cancelled = threading.Event()
+ self._producers = 0
+ self._producer_lock = threading.Lock()
+ self.done = threading.Event()
+ self._conn_timeout = conn_timeout
+ self._queue = Queue()
+ self.coverage = 'PANTS_PY_COVERAGE' in os.environ
+ self.coverage_dir = os.path.join(Config.load().getdefault('pants_distdir'), 'coverage')
+ if self.coverage:
+ safe_mkdir(self.coverage_dir, clean=True)
+ self._equivalent_prefixes = OrderedSet([])
+ self._data_files = []
+
+ def build_chroot(self, target):
+ chroot = PythonTestChroot(
+ target,
+ self.root_dir,
+ interpreter=self.interpreter,
+ coverage='PANTS_PY_COVERAGE' in os.environ,
+ redirect=self.parallelism > 1,
+ conn_timeout=self._conn_timeout)
+ chroot.build()
+ return chroot
+
+ def drain_queue(self, chroot):
+ while True:
+ result = chroot.read()
+ if result:
+ self._queue.put((chroot, result))
+ chroot.acknowledge(result)
+ else:
+ break
+
+ def _collect_source_roots(self, target):
+ for dependency in target.dependencies:
+ for library in dependency.resolve():
+ if isinstance(library, PythonLibrary):
+ yield library.target_base
+
+ def _collect_coverage(self, chroot):
+ for prefix, data_file in chroot.iter_coverage_data():
+ self._equivalent_prefixes.add(os.path.realpath(prefix))
+ target_file = os.path.join(self.coverage_dir, os.path.basename(data_file))
+ shutil.copy(data_file, target_file)
+ self._data_files.append(target_file)
+
+ @classmethod
+ def _generate_coverage_config(cls, coverage_root, equivalent_prefixes):
+ with coverage_config(os.path.join(coverage_root, '.coveragerc')) as cp:
+ def as_list(iterable):
+ return '\n' + '\n'.join(iterable)
+ cp.set('run', 'data_file', os.path.join(coverage_root, 'data'))
+ cp.add_section('paths')
+ cp.set('paths', 'source', as_list(equivalent_prefixes))
+ return os.path.join(coverage_root, '.coveragerc')
+
+ @classmethod
+ @contextmanager
+ def _gen_cov(cls, coveragerc):
+ from coverage import coverage
+ cov = coverage(config_file=coveragerc)
+ cov.combine()
+ yield cov
+
+ def run_report(self, source_root):
+ self._display.center(' coverage report source root: %s ' % source_root)
+ with temporary_dir() as td:
+ for filename in os.listdir(self.coverage_dir):
+ if filename.startswith('data'):
+ shutil.copy(os.path.join(self.coverage_dir, filename), os.path.join(td, filename))
+ coveragerc = self._generate_coverage_config(td,
+ [source_root] + list(self._equivalent_prefixes))
+ with self._gen_cov(coveragerc) as cov:
+ cov.report(show_missing=False)
+ self._display.center('')
+ self._display.write('HTML report: file://%s' % os.path.realpath(
+ os.path.join(self.coverage_dir, source_root, 'index.html')))
+ cov.html_report(directory=os.path.join(self.coverage_dir, source_root))
+ self._display.center('')
+ print('\n')
+
+ def _run_one(self, chroot):
+ if self.cancelled.is_set():
+ with self._producer_lock:
+ self._producers -= 1
+ chroot.mark_skipped()
+ return
+
+ chroot.popen(self.args)
+
+ while not chroot.finished and not self.cancelled.is_set():
+ self.drain_queue(chroot)
+ self.cancelled.wait(timeout=0.2)
+ self.drain_queue(chroot)
+
+ if self.cancelled.is_set():
+ chroot.kill()
+
+ self._collect_coverage(chroot)
+
+ with self._producer_lock:
+ self._producers -= 1
+
+ def _iter_test_suite(self, target):
tests = OrderedSet([])
def _gather_deps(trg):
if isinstance(trg, PythonTests):
@@ -243,33 +615,112 @@ class PythonTestBuilder(object):
for dep in dependency.resolve():
_gather_deps(dep)
_gather_deps(target)
+ return iter(tests)
+
+ def _iter_tests(self, targets):
+ for target in targets:
+ if isinstance(target, PythonTests):
+ yield target
+ elif isinstance(target, PythonTestSuite):
+ for subtarget in self._iter_test_suite(target):
+ yield subtarget
+ else:
+ raise self.InvalidDependencyException(
+ "Invalid dependency in python test target: %s" % target)
+
+ def _parallel_test_driver(self, tests):
+ from concurrent import futures
+ from concurrent.futures.thread import ThreadPoolExecutor
+ with ThreadPoolExecutor(self.parallelism) as executor:
+ fs = [executor.submit(self._run_one, chroot) for test, chroot in tests.items()]
+ futures.wait(fs, return_when=futures.ALL_COMPLETED)
+ for future in fs:
+ if future.exception():
+ print('Future %s excepted: %s' % (future, future.exception()))
+ self.done.set()
+
+ def run(self):
+ tests = {}
+ failed_chroots = set()
+ self._display.center(' building chroots ')
+ for test in self._iter_tests(self.targets):
+ try:
+ tests[test] = self.build_chroot(test)
+ result = Display.render('finished', 'green')
+ except DistributionNotFound:
+ failed_chroots.add(test)
+ result = Display.render('failed', 'red')
+ self._display.left_right(test.address.compact_repr(), result)
- failed = False
- for test in tests:
- rv = self._run_python_test(test)
- if not rv.success:
- failed = True
- if fail_hard:
- return rv
- return PythonTestResult.rc(1 if failed else 0)
+ try:
+ self._run_tests(tests)
+ except KeyboardInterrupt:
+ self.cancelled.set()
+
+ self.done.wait()
+
+ self._display.center(' result by target ')
+ for target, chroot in sorted(tests.items()):
+ self._display.left_right(target.address.compact_repr(), chroot.outcome)
+ if chroot.outcome.failed:
+ output = chroot.output
+ if output:
+ self._display.center(' error output ')
+ self._display.write('\n'.join(chroot.output.splitlines()))
+ self._display.center('')
+ for target in failed_chroots:
+ self._display.left_right(target.address.compact_repr(), PythonTestOutcome.exception())
+
+ if self.coverage:
+ source_roots = set()
+ for target in tests:
+ source_roots.update(self._collect_source_roots(target))
+ for source_root in source_roots:
+ self.run_report(source_root)
+
+ success = all(chroot.outcome is not None and chroot.outcome.success for chroot in tests.values())
+ success = False if failed_chroots else success
+ return 0 if success else 1
+
+ def _run_tests(self, tests):
+ if not tests:
+ self.done.set()
+ return
+
+ self._producers = len(tests)
- def _run_tests(self, targets):
fail_hard = 'PANTS_PYTHON_TEST_FAILSOFT' not in os.environ
+
if 'PANTS_PY_COVERAGE' in os.environ:
# Coverage often throws errors despite tests succeeding, so make PANTS_PY_COVERAGE
# force FAILSOFT.
fail_hard = False
- failed = False
- for target in targets:
- if isinstance(target, PythonTests):
- rv = self._run_python_test(target)
- elif isinstance(target, PythonTestSuite):
- rv = self._run_python_test_suite(target, fail_hard)
- else:
- raise PythonTestBuilder.InvalidDependencyException(
- "Invalid dependency in python test target: %s" % target)
- if not rv.success:
- failed = True
- if fail_hard:
- return rv
- return PythonTestResult.rc(1 if failed else 0)
+
+ test_runner_thread = threading.Thread(target=lambda: self._parallel_test_driver(tests))
+ test_runner_thread.start()
+
+ if self.parallelism > 1:
+ self._display.center(' result by test ')
+
+ COLOR_MAP = {
+ 'passed': 'green',
+ 'failed': 'red',
+ 'skipped': 'yellow',
+ }
+
+ def result_str(result):
+ return Display.render(result.outcome, COLOR_MAP.get(result.outcome, 'yellow'))
+
+ while self._producers > 0:
+ while True:
+ try:
+ chroot, result = self._queue.get(timeout=0.2)
+ self._queue.task_done()
+ except Empty:
+ break
+ # If running in parallel mode, serialize test results ourselves.
+ if self.parallelism > 1:
+ self._display.left_right(
+ '%s:%s: %s' % (result.filename, result.lineno, result.function), result_str(result))
+ if result.outcome != 'passed' and fail_hard:
+ self.cancelled.set()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment