Skip to content

Instantly share code, notes, and snippets.

@bbannier
Last active September 30, 2019 13:17
Show Gist options
  • Save bbannier/c288c5316e9bd0aacf953dd465a43a66 to your computer and use it in GitHub Desktop.
Save bbannier/c288c5316e9bd0aacf953dd465a43a66 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import copy
import json
import logging
import math
import os
import re
import subprocess
import sys
try:
import pytest # pylint: disable=import-error
except ModuleNotFoundError:
pass
def parse_output(output):
tests = {}
for l in output.split("\n"):
# We explicitly handle tests which never return a result to
# deal with e.g., segfaults.
m1 = re.match(r"^\[\s*RUN*\s*\]\s([^\s]*)", l)
m2 = re.match(r"^\[\s*([A-Z]*)\s*\]\s([^\s]*)\s\((.*)\)", l)
if m1:
test = m1.group(1)
if test not in tests:
tests[test] = {"passed": []}
tests[test]["passed"].append(0)
elif m2:
test = m2.group(2)
passed = {"OK": 1, "FAILED": 0}[m2.group(1)]
# duration = m2.group(3)
if test not in tests:
tests[test] = {"passed": []}
tests[test]["passed"][-1] = passed
return tests
def test_parse_output():
log = (
"Note: Google Test filter = *-:ContentType/AgentAPIStreamingTest.Aa\n"
"[==========] Running 352 tests from 55 test cases.\n"
"[----------] Global test environment set-up.\n"
"[----------] 1 test from AdaptorTest\n"
"[ RUN ] AdaptorTest.Reversed\n"
"[ OK ] AdaptorTest.Reversed (0 ms)\n"
"[----------] 1 test from AdaptorTest (0 ms total)\n"
"\n"
"[----------] 16 tests from ArchiverTest\n"
"[ RUN ] ArchiverTest.ExtractEmptyInputFile\n"
"[ OK ] ArchiverTest.ExtractEmptyInputFile (1 ms)\n"
"[ RUN ] ArchiverTest.ExtractInputFileNotFound\n"
"[ OK ] ArchiverTest.ExtractInputFileNotFound (0 ms)\n"
"[ RUN ] ArchiverTest.ExtractInputFileNotFound\n"
"[ FAILED ] ArchiverTest.ExtractInputFileNotFound (0 ms)\n"
"[ RUN ] Foo/Bar.Baz/0\n"
"[ OK ] Foo/Bar.Baz/0 (0 ms)\n"
"[ RUN ] Foo/Bar.Baz/1\n"
"[ OK ] Foo/Bar.Baz/1 (0 ms)\n"
"[ RUN ] ArchiverTest.ABORTED"
)
tests = parse_output(log)
assert len(tests) == 6
assert len(tests["AdaptorTest.Reversed"].keys()) == 1
assert tests["AdaptorTest.Reversed"]["passed"] == [1]
assert tests["ArchiverTest.ExtractEmptyInputFile"]["passed"] == [1]
assert tests["ArchiverTest.ExtractInputFileNotFound"]["passed"] == [1, 0]
assert tests["ArchiverTest.ABORTED"]["passed"] == [0]
assert tests["Foo/Bar.Baz/0"]["passed"] == [1]
assert tests["Foo/Bar.Baz/1"]["passed"] == [1]
def calculate_statistic(runs):
assert runs
passed = sum(runs)
total = len(runs)
p = 1.0 * passed / total
# The count uncertainties follow Poisson statistics.
dpassed = max(1, math.sqrt(passed))
dtotal = max(1, math.sqrt(total))
# We approximately estimate the resulting (squared) uncertainty with
# Gaussian propagation.
sigma2 = (passed ** 2 * dtotal ** 2 + dpassed ** 2 * total ** 2) / total ** 4
return total, dtotal, passed, dpassed, p, math.sqrt(sigma2)
def estimate_required(runs, sigma):
assert sigma != 0
if not runs:
return 0
total, _, _, _, p, sigma_ = calculate_statistic(runs)
if sigma_ <= sigma:
# If the uncertainty is smaller than the target we are done.
return 0
# Otherwise estimate the number of additional runs.
#
# This relation can be derived by assuming Gaussian propagation
# of uncertainties in p=a/b with count uncertainties
# dn=sqrt(n) and stable p. This gives us a relation for the
# uncertainty of p
#
# dp**2 = ((a*db)**2 + (da*b)**2)/b**4
#
# which has two limiting cases:
#
# b = da/dp for p==0
# b = p*(p+1)/dp**2 else
#
# Setting the target value dp==sigma and reducing by the
# number of already executed runs gives the relation. We add
# one additional run so that a computed value of e.g., 22.3
# would lead to the required 23 runs.
if p == 0:
b = int(1 / sigma)
else:
# NOTE: If p is currently overestimated we might execute too
# many runs. This can especially be an issue when we do not
# have good statistics, yet. Consider reducing p, e.g., by its
# current uncertainty.
b = int((p ** 2 + p) / sigma ** 2 + 1)
return max(1, b - total)
def test_estimate_required():
sigma = 0.1
estimate = estimate_required({}, sigma)
assert estimate == 0
estimate = estimate_required([1], sigma)
assert estimate == 199
estimate = estimate_required([1] * 2, sigma)
assert estimate == 198
estimate = estimate_required([1] * 200, sigma)
assert estimate == 0
estimate = estimate_required([0], sigma)
assert estimate == 9
estimate = estimate_required([0] * 2, sigma)
assert estimate == 8
estimate = estimate_required([0] * 8, sigma)
assert estimate == 2
estimate = estimate_required([0] * 200, sigma)
assert estimate == 0
def summarize(results):
print("{:<70} | {:<7} | {:<7} | {:<7}".format("Test", "p", "s", "Runs"))
print("-" * 70 + "-|-" + "-" * 7 + "-|-" + "-" * 7 + "-|-" + "-" * 7)
for k, v in results.items():
total, _, _, _, p, sigma = calculate_statistic(v["passed"])
print("{:<70} | {:0.5f} | {:0.5f} | {:>7}".format(k, p, sigma, total))
def load_db(path):
# FIXME(bbannier): Handle `FileExistsError` for lock file, e.g., retry.
lock = path + ".lock"
with open(lock, "x"):
try:
with open(path, "r") as f:
data = json.load(f)
except FileNotFoundError as e:
raise e
finally:
os.remove(lock)
return data
def test_load_db():
import shutil
import tempfile
try:
directory = tempfile.mkdtemp()
db = os.path.join(directory, "db")
# If the database does not exist we raise a `FileNotFoundError`.
with pytest.raises(FileNotFoundError):
load_db(db)
# If no lock file exists the database can be loaded.
r1 = {"foo": "bar"}
save_db(r1, db)
r2 = load_db(db)
assert r2 == r1
# If a lock file exists the database cannot be loaded.
open(db + ".lock", "a").close()
with pytest.raises(FileExistsError):
load_db(db)
finally:
shutil.rmtree(directory)
def save_db(db, path):
# FIXME(bbannier): We might loose data if the dump is interrupted.
# Consider using a temporary file.
#
# FIXME(bbannier): Handle `FileExistsError` for lock file, e.g., retry.
lock = path + ".lock"
with open(lock, "x") as f:
try:
with open(path, "w") as f:
json.dump(db, f)
finally:
os.remove(lock)
def test_save_db():
import shutil
import tempfile
try:
directory = tempfile.mkdtemp()
db = os.path.join(directory, "db")
# If the database does not exist we can serialize it to a new file.
r1 = {"foo": "bar"}
save_db(r1, db)
r2 = load_db(db)
assert r2 == r1
# If the database exists we can serialize over it.
r3 = {"foo": "baz"}
save_db(r3, db)
r4 = load_db(db)
assert r4 == r3
# If a lock file exists the database cannot be saved.
open(db + ".lock", "a").close()
with pytest.raises(FileExistsError):
save_db(r4, db)
finally:
shutil.rmtree(directory)
def main(exe, sigma, db=None, max_repeat=None):
def agg_estimate_required(results):
estimates = {}
for test in results:
e = estimate_required(results[test]["passed"], sigma)
if e > 0:
estimates[test] = e
estimate = min(estimates.values()) if estimates else 0
if max_repeat:
estimate = min(max_repeat, estimate)
required_tests = map(
lambda x: x[0], filter(lambda x: x[1] > 0, estimates.items())
)
return estimate, required_tests
# FIXME(bbannier): This should be a member function of a proper result
# class.
def merge_results(results1, results2):
results = copy.deepcopy(results1)
for test in results2:
if test not in results:
results[test] = {"passed": []}
results[test]["passed"].extend(results2[test]["passed"])
return results
try:
RESULTS = load_db(db) if db else {}
except FileNotFoundError:
RESULTS = {}
results = {}
required_runs = 1
required_tests = []
initial_run = True
aborted = []
logging.log(logging.INFO, "Performing initial run")
while required_runs > 0:
# We always need to execute at least once since the tests read from
# file could be different than the tests selected by the current
# filter.
argv = [exe, "--gtest_repeat={}".format(required_runs)]
if required_tests:
argv.append("--gtest_filter={}".format(":".join(required_tests)))
try:
results_ = parse_output(
subprocess.check_output(
argv, stderr=subprocess.STDOUT, universal_newlines=True
)
)
aborted.append(False)
except subprocess.CalledProcessError as e:
# Processes terminated with a signal have negative return
# codes where the value corresponds to the signal.
aborted.append(e.returncode == -6)
results_ = parse_output(e.output)
if not results_:
break
if initial_run:
initial_run = False
for test in results_:
if test in RESULTS:
results[test] = copy.deepcopy(RESULTS[test])
results[test]["passed"].extend(results_[test]["passed"])
else:
results[test] = copy.deepcopy(results_[test])
else:
results = merge_results(results, results_)
required_runs, required_tests = agg_estimate_required(results)
# Log progress.
if required_runs > 0 and not initial_run:
stats = {
test: calculate_statistic(results[test]["passed"]) for test in results
}
test = max(stats.items(), key=lambda x: x[1][5])[0]
sigma_ = stats[test][5]
p = stats[test][4]
logging.info(
"Executing {} more runs since result for {}"
" has sigma={:0.5f} at p={:0.5f} which is more than the"
" required {:0.5f}".format(required_runs, test, sigma_, p, sigma)
)
if db:
# Reload results database since it might have changed on disk.
#
# FIXME(bbannier): We might still loose some data if there
# is another write between this load and the save below.
# Ideally we'd perform this all under a single lock.
try:
RESULTS = load_db(db) if db else {}
except FileNotFoundError:
RESULTS = {}
for test in results:
RESULTS[test] = copy.deepcopy(results[test])
save_db(RESULTS, db)
summarize(results)
if all(aborted):
logging.log(
logging.WARNING, "All tests runs aborted, results might be incomplete"
)
def report(db):
try:
results = json.load(open(db)) if db else {}
except FileNotFoundError:
results = {}
summarize(results)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--executable", help="Test executable.")
parser.add_argument(
"-s",
"--sigma",
type=float,
default=0.1,
help="Required absolute rate significance. Note that the "
"number of needed runs roughly scales like 1/s^2. "
"DEFAULT: 0.1.",
)
parser.add_argument(
"-d",
"--db",
help="Optional path to the database for storing results. " "DEFAULT: unset.",
)
parser.add_argument(
"-r",
"--report",
action="store_true",
help="Just report statistics from database, do not execute. " "DEFAULT: False.",
)
parser.add_argument(
"-m",
"--max_repeat",
type=int,
help="Limit the number of repetitions to execute at a time. "
"If using a database it will be updated after each repetition. "
"DEFAULT: no limitation.",
)
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG)
if args.report:
if not args.db:
print("Cannot report without database file", file=sys.stderr)
report(args.db)
sys.exit(0)
if not args.executable:
parser.print_usage()
sys.exit(1)
main(args.executable, args.sigma, args.db, args.max_repeat)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment