Skip to content

Instantly share code, notes, and snippets.

@pkhuong
Last active December 15, 2021 23:25
Show Gist options
  • Save pkhuong/fe81822fd6adab723f91601f39dce4fb to your computer and use it in GitHub Desktop.
Save pkhuong/fe81822fd6adab723f91601f39dce4fb to your computer and use it in GitHub Desktop.
MIT-licensed support code for coverage-guided Hypothesis testing
import functools
import os
import re
import threading
import time
import cffi
__all__ = ["teardown", "reset", "report", "wrap_fn"]
SELF_DIR = os.path.dirname(os.path.abspath(__file__))
BTS_DECLARATIONS = """
struct bts_aux_record {
uint64_t from_addr; /* from and to are instruction addresses. */
uint64_t to_addr;
uint64_t flags; /* 0x10 = predicted, in theory, maybe. */
};
/*
* This function must be called with the value in
* `/sys/bus/event_source/devices/intel_bts/type` before calling
* `bts_setup`.
*/
void bts_init(int detected_bts_perf_type);
/*
* Cleans up any BTS state for the current thread.
*/
void bts_teardown(void);
/*
* Overwrites or creates the BTS state for the current thread, with
* an auxiliary (tracing) buffer of `aux_size` bytes. `aux_bytes`
* must be a power of two and must be at least one page.
*
* Returns 0 on success, negative on failure.
*/
int bts_setup(size_t aux_size);
/*
* Enables branch tracing for the calling thread, which must have
* a BTS state (i.e., only call `bts_start` after `bts_setup`).
*
* Returns 0 on success, negative on failure.
*/
int bts_start(void);
/*
* Stops branch tracing for the current thread, and returns a
* temporary (thread-local) buffer of the BTS records since
* the last call to `bts_start`.
*
* The first argument is overwritten with the number of valid
* records in the return value, or a negative count on error.
*
* When `(*OUT_num_elements + 2) * sizeof(struct bts_aux_record)`
* exceeds the `aux_size` passed to `bts_setup`, tracing may have
* exhausted the buffer space and stopped early. This trace
* truncation does not affect the execution of the traced program.
*/
const struct bts_aux_record *bts_stop(ssize_t *OUT_num_elements);
"""
DEFAULT_AUX_SIZE = 2 ** 25
FFI = None
BTS = None
ENABLED = False
BTS_TYPE = None
BTS_TYPE_PATH = "/sys/bus/event_source/devices/intel_bts/type"
try:
with open(BTS_TYPE_PATH, "r") as f:
BTS_TYPE = int(f.read())
ENABLED = True
except:
pass
def _init_bts():
BTS.bts_init(BTS_TYPE)
if ENABLED:
FFI = cffi.FFI()
FFI.cdef(BTS_DECLARATIONS)
BTS = FFI.dlopen(SELF_DIR + "/libbts.so")
FFI.init_once(_init_bts, "init_bts")
def find_current_mappings():
ret = []
with open("/proc/self/maps", "r") as f:
for line in f:
m = re.match(r"^([0-9a-f]+)-([0-9a-f]+) r-xp .*", line)
if m:
mapping = (int(m.group(1), 16), int(m.group(2), 16))
ret.append(mapping)
return ret
BASELINE_MAPPINGS = find_current_mappings()
def address_in_baseline_map(x):
for mapping in BASELINE_MAPPINGS:
if mapping[0] <= x < mapping[1]:
return True
return False
FULLY_SETUP = threading.local()
def teardown():
if not ENABLED:
return
BTS.bts_teardown()
FULLY_SETUP.setup = False
def ensure_setup(buffer_size=DEFAULT_AUX_SIZE):
if not ENABLED or getattr(FULLY_SETUP, "setup", None):
return
assert BTS.bts_setup(buffer_size) == 0
FULLY_SETUP.setup = True
EDGE_BUFFER = threading.local()
def reset(buffer_size=DEFAULT_AUX_SIZE):
if not ENABLED:
return
EDGE_BUFFER.buffer = []
EDGE_BUFFER.call_count = 0
assert BTS.bts_setup(buffer_size) == 0
FULLY_SETUP.setup = True
MIN_ADDRESS = 2 ** 12
MAX_ADDRESS = 2 ** 63 - 1
ALL_SEEN_EDGES = dict()
USELESS_EDGES = set()
initial_time = time.time()
def hash_report(od_pairs):
global BEST_VALUES
"""Sketches the *unique* origin/destination pairs into an array of values"""
ret = list()
seen = set(USELESS_EDGES)
for pair in od_pairs:
# Skip kernel addresses.
if pair[0] > MAX_ADDRESS or pair[1] > MAX_ADDRESS:
continue
if pair[0] < MIN_ADDRESS or pair[1] < MIN_ADDRESS:
continue
if pair in seen:
continue
if address_in_baseline_map(pair[0]) or address_in_baseline_map(pair[1]):
continue
if pair not in ALL_SEEN_EDGES:
print(
"%f new edge %i %s"
% (
time.time() - initial_time,
len(ALL_SEEN_EDGES),
(hex(pair[0]), hex(pair[1])),
)
)
ALL_SEEN_EDGES[pair] = len(ALL_SEEN_EDGES)
seen.add(pair)
ret.append(ALL_SEEN_EDGES[pair])
return ret
def update_useless_edges():
if not ENABLED:
return
num = FFI.new("ssize_t *")
ret = BTS.bts_stop(num)
for i in range(num[0]):
USELESS_EDGES.add((ret[i].from_addr, ret[i].to_addr))
for pair in getattr(EDGE_BUFFER, "buffer", []):
USELESS_EDGES.add(pair)
def report():
if not ENABLED:
return []
num = FFI.new("ssize_t *")
ret = BTS.bts_stop(num)
od_pairs = [(ret[i].from_addr, ret[i].to_addr) for i in range(num[0])] + getattr(
EDGE_BUFFER, "buffer", []
)
call_count = max(1, getattr(EDGE_BUFFER, "call_count", 0))
return call_count, hash_report(od_pairs)
def _start():
if not ENABLED or not getattr(FULLY_SETUP, "setup", None):
return
BTS.bts_start()
def _stop():
if not ENABLED or not getattr(FULLY_SETUP, "setup", None):
return
if getattr(EDGE_BUFFER, "buffer", None) is None:
EDGE_BUFFER.buffer = []
EDGE_BUFFER.call_count = 1 + getattr(EDGE_BUFFER, "call_count", 0)
num = FFI.new("ssize_t *")
ret = BTS.bts_stop(num)
for i in range(num[0]):
EDGE_BUFFER.buffer.append((ret[i].from_addr, ret[i].to_addr))
def wrap_fn(fn):
if not ENABLED or not callable(fn):
return fn
@functools.wraps(fn)
def wrapper(*arg, **kwargs):
try:
_start()
return fn(*arg, **kwargs)
finally:
_stop()
return wrapper
import faulthandler
import os
import re
from types import SimpleNamespace
import sanitizers
import cffi
SELF_DIR = os.path.dirname(os.path.abspath(__file__))
TOPLEVEL = os.path.abspath(SELF_DIR + "/../../../../") + "/"
PREDEFINITIONS = """
typedef ... regex_t;
/*
* While pycparser supports __int128, cffi does not. Provide these
* fake definitions for __[u]int128_t; they look the same to the ABI
* (in as much as there is an ABI for this extension).
*/
typedef struct {
uint64_t lo;
uint64_t hi;
} __uint128_t;
typedef struct {
uint64_t lo;
int64_t hi;
} __int128_t;
"""
REPLACEMENTS = {
r"STAILQ_HEAD\s*\(\s*(\w*)\s*,\s*(\w+)\s*\)": r"struct \1 { struct \2 *stqh_first; struct \2 **stqh_last; }",
r"STAILQ_ENTRY\s*\(\s*(\w+)\s*\)": r"struct { struct \1 *stqe_next; }",
}
PUBLIC_HEADERS = [
# ...
]
INTERNAL_HEADERS = [
# ...
]
STRIPPED_PREFIXES = [
"crdb_",
"CRDB_",
]
FFI = cffi.FFI()
def read_stripped_header(path):
"""Returns the contents of a header file without preprocessor directives."""
ret = ""
in_directive = False
with open(path) as f:
for line in f:
if in_directive or re.match(r"^\s*#", line):
in_directive = line.endswith("\\\n")
else:
in_directive = False
for pattern, repl in REPLACEMENTS.items():
line = re.sub(pattern, repl, line)
ret += line
return ret
FFI.cdef(PREDEFINITIONS)
for header in PUBLIC_HEADERS:
FFI.cdef(read_stripped_header(TOPLEVEL + "src/crdb/include/" + header))
for header in INTERNAL_HEADERS:
FFI.cdef(read_stripped_header(TOPLEVEL + "src/crdb/include/crdb/" + header))
if sanitizers.ENABLED:
C = FFI.dlopen(TOPLEVEL + "_build_asan/crdb/libcrdb.so")
else:
C = FFI.dlopen(TOPLEVEL + "_build/crdb/libcrdb.so")
def _strip_prefixes(lib):
"""Returns a namespace with every symbol in `lib` resolved eagerly,
and with aliases stripped of the `crdb_` prefix (and other entries
in STRIPPED_PREFIXES).
"""
ret = SimpleNamespace()
symbols = dir(lib)
for symbol in symbols:
# This is where we would wrap with BTS hooks.
fn = sanitizers.wrap_with_tracking(getattr(lib, symbol))
setattr(ret, symbol, fn)
for prefix in STRIPPED_PREFIXES:
if symbol.startswith(prefix):
suffix = symbol[len(prefix) :]
assert (
suffix not in symbols
), "Name collision when stripping %s from %s." % (prefix, symbol,)
setattr(ret, suffix, fn)
break
return ret
C = _strip_prefixes(C)
def _init_crdb():
error = FFI.new("crdb_error_t *")
assert C.init(FFI.NULL, error), "crdb_init: %s %i" % (
FFI.string(error.message),
error.error,
)
FFI.init_once(_init_crdb, "init_crdb")
# Pass in a copy of stderr in case anyone plays redirection tricks.
faulthandler.enable(os.dup(2))
Copyright 2020 Paul Khuong, Backtrace I/O, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import faulthandler
import os
from types import SimpleNamespace
import bts
import sanitizers
import cffi
# We assume a libFuzzer entry point.
DECLARATION = "int LLVMFuzzerTestOneInput(const uint8_t *, size_t);"
assert (
"FUZZ_TARGET" in os.environ
), "FUZZ_TARGET must point to a .so with a libFuzzer entry point"
FUZZ_TARGET = os.environ["FUZZ_TARGET"]
FFI = cffi.FFI()
FFI.cdef(DECLARATION)
C = FFI.dlopen(FUZZ_TARGET)
def _wrap_symbols(lib):
ret = SimpleNamespace()
symbols = dir(lib)
for symbol in symbols:
fn = sanitizers.wrap_with_tracking(bts.wrap_fn(getattr(lib, symbol)))
setattr(ret, symbol, fn)
return ret
C = _wrap_symbols(C)
# Pass in a copy of stderr in case anyone plays redirection tricks.
faulthandler.enable(os.dup(2))
#!/bin/sh
ASAN_OPTIONS=halt_on_error=1,leak_check_at_exit=0 LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libasan.so.5 FUZZ_TARGET=fuzzer-test-suite/openssl-1.0.1f-fsanitize.so pytest --capture=sys "$@"
"""
Hypothesis integration with LSan / ASan
"""
import atexit
import contextlib
import functools
import os
import re
import tempfile
import unittest
import cffi
import hypothesis.stateful
__all__ = [
"add_interesting_library",
"leaky_region",
"wrap_with_tracking",
"TestCase",
"RuleBasedStateMachine",
]
# When ASAN / LSAN is enabled, we have some extra instrumentation to
# assert against leaks, and locally disable leak detection.
#
# See https://github.com/gcc-mirror/gcc/blob/releases/gcc-6.3.0/libsanitizer/include/sanitizer/common_interface_defs.h
ASAN_DECLARATIONS = """
void __sanitizer_set_report_path(const char *path);
void __asan_set_error_report_callback(void (*callback)(const char*));
void __lsan_disable(void);
void __lsan_enable(void);
void __lsan_do_leak_check(void);
int __lsan_do_recoverable_leak_check(void);
"""
FFI = None
ASAN = None
# In order to load an ASAN library in a regular python binary, we must
# build the library with dynamic linkage to libasan (default on GCC,
# `-shared-libasan` for clang), and LD_PRELOAD libasan before invoking
# python.
#
# https://github.com/google/sanitizers/issues/918
#
# In order to enable ASAN, ASAN_OPTIONS must be defined, and LD_PRELOAD
# should be setup; for example,
#
# ASAN_OPTIONS=halt_on_error=0,leak_check_at_exit=0, \
# LD_PRELOAD=/usr/lib/gcc/x86_64-linux-gnu/6.3.0/libasan.so ...
#
# You want to disable the final check because it rudely exits whenever
# a leak is detected. We could call `do_leak_check` in an atexit hook
# while .so symbols are around, but that still exits the process...
ENABLED = "ASAN_OPTIONS" in os.environ and "LD_PRELOAD" in os.environ
if ENABLED:
FFI = cffi.FFI()
FFI.cdef(ASAN_DECLARATIONS)
ASAN = FFI.dlopen(None)
# This dictionary maps leak description to occurrence count.
#
# Leaks should be monotonically increasing, so this lets us
# detect when a new leak was introduced since the last call
# to _scan_for_interesting_leaks.
#
# A set should suffice, since LSan does error merging, but
# it doesn't hurt to be conservative here.
_KNOWN_LEAKS = dict() # leak -> counter
# LSan reports to stderr.
REPORT_FD = 2
INTERESTING_LIBRARIES = set()
INTERESTING_LEAK_PATTERN = None
def add_interesting_library(library):
global INTERESTING_LEAK_PATTERN
INTERESTING_LIBRARIES.add(library)
any_lib_pattern = b"|".join(
b"(" + re.escape(bytes(lib, "utf-8")) + b")"
for lib in sorted(INTERESTING_LIBRARIES)
)
INTERESTING_LEAK_PATTERN = re.compile(r"^\s+#\d+.*" + any_lib_pattern)
def _scan_for_interesting_leaks():
"""Scans for new leaks with interesting libraries in the backtrace."""
if not ENABLED:
return []
pattern = INTERESTING_LEAK_PATTERN
leaks = []
with tempfile.TemporaryFile() as temp:
try:
saved_fd = os.dup(REPORT_FD)
os.dup2(temp.fileno(), REPORT_FD)
ASAN.__lsan_do_recoverable_leak_check()
finally:
os.dup2(saved_fd, REPORT_FD)
os.close(saved_fd)
known_leaks = dict(_KNOWN_LEAKS)
temp.seek(0)
acc = ""
must_dump = False
for line in temp:
line = str(line, "utf-8")[:-1]
# End of the report section.
if re.match(r"^\s*$", line):
if acc in known_leaks:
known_leaks[acc] -= 1
if known_leaks[acc] == 0:
del known_leaks[acc]
else:
_KNOWN_LEAKS[acc] = 1 + _KNOWN_LEAKS.get(acc, 0)
if must_dump:
leaks.append(acc)
acc = ""
must_dump = False
else:
acc += line + "\n"
if pattern and re.match(pattern, line):
must_dump = True
return leaks
def reset_leaks():
"""Updates the internal list of known leaks to take into account
everything that's already leaked."""
if not ENABLED:
return
_scan_for_interesting_leaks()
def assert_no_leak():
"""When ASan/LSan is enabled, asserts that no new interesting allocations
have been leaked since the last call to assert_no_leak.
"""
if not ENABLED:
return
leaks = _scan_for_interesting_leaks()
assert not leaks, "New leaks:\n%s" % "\n".join(leaks)
IN_LEAKY_REGION = False
@contextlib.contextmanager
def leaky_region(leaky=True):
"""Marks a dynamic extent as expected to leak. Use this as
with leaky_region():
known
leaky
code
to disable allocation tracking in the with block.
"""
global IN_LEAKY_REGION
previous_state = IN_LEAKY_REGION
leaky = leaky and ENABLED
try:
if leaky:
ASAN.__lsan_disable()
IN_LEAKY_REGION = True
yield None
finally:
IN_LEAKY_REGION = previous_state
if leaky:
ASAN.__lsan_enable()
LAST_ASAN_ERROR = None
def _filter_lsan_errors(string):
acc = ""
active = False
for line in string.splitlines(keepends=False):
if re.match(r".*=+\d+=+ERROR: ", line):
active = not re.match(r".*=+ERROR: LeakSanitizer: ", line)
if active:
acc += line + "\n"
if re.match(r"^\W*=+\W*$", line):
active = False
return acc
def wrap_with_tracking(fn):
"""Wraps any callable value with a try/finally block to locally enable
allocation tracking.
If ASan is disabled or the value is not a function, this is the
identity.
"""
if not ENABLED or not callable(fn):
return fn
@functools.wraps(fn)
def wrapper(*args, **kwargs):
global LAST_ASAN_ERROR
check_leaks = not IN_LEAKY_REGION
try:
LAST_ASAN_ERROR = None
if check_leaks:
ASAN.__lsan_enable()
return fn(*args, **kwargs)
finally:
if check_leaks:
ASAN.__lsan_disable()
error = LAST_ASAN_ERROR
LAST_ASAN_ERROR = None
assert error is None, "ASan error: %s" % _filter_lsan_errors(
str(error, "utf-8")
)
return wrapper
class TestCase(unittest.TestCase):
def execute_example(self, f):
reset_leaks()
result = f()
assert_no_leak()
return result
class RuleBasedStateMachine(hypothesis.stateful.RuleBasedStateMachine):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
reset_leaks()
old_teardown = self.teardown
def new_teardown():
old_teardown()
assert_no_leak()
# Try to minimise false leak alarms by only checking for leaks
# after teardown, i.e., when there should be no state left.
# Checking here also guarantees that no example leaks.
self.teardown = new_teardown
if ENABLED:
ASAN.__lsan_disable()
reset_leaks()
# Always check for leaks before shutdown.
atexit.register(assert_no_leak)
@FFI.callback("void(const char*)")
def asan_error_callback(message):
global LAST_ASAN_ERROR
# Save the first error in a callback: the rest might be
# blowback from that initial problem.
if LAST_ASAN_ERROR is None:
LAST_ASAN_ERROR = FFI.string(message)
ASAN.__asan_set_error_report_callback(asan_error_callback)
import struct
import sys
from hypothesis import settings
from hypothesis.stateful import initialize, precondition, rule
import hypothesis.strategies as st
from llvm_one_input import C
import sanitizers
class GrammarTester(sanitizers.RuleBasedStateMachine):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
with sanitizers.leaky_region():
assert C.LLVMFuzzerTestOneInput(b"", 0) == 0
self.last_len = 0
self.buf = b""
def teardown(self):
print("CHECK")
sys.stdout.flush()
assert C.LLVMFuzzerTestOneInput(self.buf, len(self.buf)) == 0
@initialize(
tls_ver=st.sampled_from([1, 2, 3]),
payload=st.binary(),
padding=st.binary(min_size=16),
)
def add_heartbeat(self, tls_ver, payload, padding):
hb_payload = bytes([0x01]) + struct.pack(">H", len(payload)) + payload + padding
self.buf += bytes([0x18, 0x03, tls_ver])
self.buf += struct.pack(">H", len(hb_payload))
self.buf += hb_payload
@precondition(lambda self: self.buf)
@rule(data=st.data(), value=st.integers(min_value=0, max_value=255))
def replace_byte(self, data, value):
index = data.draw(st.integers(min_value=0, max_value=len(self.buf) - 1))
prefix = self.buf[0:index]
suffix = self.buf[index + 1 :]
self.buf = prefix + bytes([value]) + suffix
@precondition(lambda self: self.buf)
@rule(data=st.data())
def strip_suffix(self, data):
count = data.draw(st.integers(min_value=1, max_value=len(self.buf)))
self.buf = self.buf[0:-count]
@rule(suffix=st.binary(min_size=1))
def add_suffix(self, suffix):
self.buf += suffix
TestWithGrammar = GrammarTester.TestCase
TestWithGrammar.settings = settings(max_examples=10000, deadline=None)
import struct
import sys
from hypothesis import settings, target
from hypothesis.stateful import initialize, precondition, rule
import hypothesis.strategies as st
import bts
from llvm_one_input import C
import sanitizers
class GrammarBtsTester(sanitizers.RuleBasedStateMachine):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
bts.reset()
with sanitizers.leaky_region():
assert C.LLVMFuzzerTestOneInput(b"", 0) == 0
bts.update_useless_edges()
bts.reset()
self.last_len = 0
self.buf = b""
def teardown(self):
print("CHECK")
sys.stdout.flush()
assert C.LLVMFuzzerTestOneInput(self.buf, len(self.buf)) == 0
for i in bts.report():
target(1.0, str(i))
bts.teardown()
@initialize(
tls_ver=st.sampled_from([1, 2, 3]),
payload=st.binary(),
padding=st.binary(min_size=16),
)
def add_heartbeat(self, tls_ver, payload, padding):
hb_payload = bytes([0x01]) + struct.pack(">H", len(payload)) + payload + padding
self.buf += bytes([0x18, 0x03, tls_ver])
self.buf += struct.pack(">H", len(hb_payload))
self.buf += hb_payload
@precondition(lambda self: self.buf)
@rule(data=st.data(), value=st.integers(min_value=0, max_value=255))
def replace_byte(self, data, value):
index = data.draw(st.integers(min_value=0, max_value=len(self.buf) - 1))
prefix = self.buf[0:index]
suffix = self.buf[index + 1 :]
self.buf = prefix + bytes([value]) + suffix
@precondition(lambda self: self.buf)
@rule(data=st.data())
def strip_suffix(self, data):
count = data.draw(st.integers(min_value=1, max_value=len(self.buf)))
self.buf = self.buf[0:-count]
@rule(suffix=st.binary(min_size=1))
def add_suffix(self, suffix):
self.buf += suffix
TestWithGrammarBts = GrammarBtsTester.TestCase
TestWithGrammarBts.settings = settings(max_examples=1000, deadline=None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment