Last active
December 15, 2021 23:25
-
-
Save pkhuong/fe81822fd6adab723f91601f39dce4fb to your computer and use it in GitHub Desktop.
MIT-licensed support code for coverage-guided Hypothesis testing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
import os | |
import re | |
import threading | |
import time | |
import cffi | |
__all__ = ["teardown", "reset", "report", "wrap_fn"] | |
SELF_DIR = os.path.dirname(os.path.abspath(__file__)) | |
BTS_DECLARATIONS = """ | |
struct bts_aux_record { | |
uint64_t from_addr; /* from and to are instruction addresses. */ | |
uint64_t to_addr; | |
uint64_t flags; /* 0x10 = predicted, in theory, maybe. */ | |
}; | |
/* | |
* This function must be called with the value in | |
* `/sys/bus/event_source/devices/intel_bts/type` before calling | |
* `bts_setup`. | |
*/ | |
void bts_init(int detected_bts_perf_type); | |
/* | |
* Cleans up any BTS state for the current thread. | |
*/ | |
void bts_teardown(void); | |
/* | |
* Overwrites or creates the BTS state for the current thread, with | |
* an auxiliary (tracing) buffer of `aux_size` bytes. `aux_bytes` | |
* must be a power of two and must be at least one page. | |
* | |
* Returns 0 on success, negative on failure. | |
*/ | |
int bts_setup(size_t aux_size); | |
/* | |
* Enables branch tracing for the calling thread, which must have | |
* a BTS state (i.e., only call `bts_start` after `bts_setup`). | |
* | |
* Returns 0 on success, negative on failure. | |
*/ | |
int bts_start(void); | |
/* | |
* Stops branch tracing for the current thread, and returns a | |
* temporary (thread-local) buffer of the BTS records since | |
* the last call to `bts_start`. | |
* | |
* The first argument is overwritten with the number of valid | |
* records in the return value, or a negative count on error. | |
* | |
* When `(*OUT_num_elements + 2) * sizeof(struct bts_aux_record)` | |
* exceeds the `aux_size` passed to `bts_setup`, tracing may have | |
* exhausted the buffer space and stopped early. This trace | |
* truncation does not affect the execution of the traced program. | |
*/ | |
const struct bts_aux_record *bts_stop(ssize_t *OUT_num_elements); | |
""" | |
DEFAULT_AUX_SIZE = 2 ** 25 | |
FFI = None | |
BTS = None | |
ENABLED = False | |
BTS_TYPE = None | |
BTS_TYPE_PATH = "/sys/bus/event_source/devices/intel_bts/type" | |
try: | |
with open(BTS_TYPE_PATH, "r") as f: | |
BTS_TYPE = int(f.read()) | |
ENABLED = True | |
except: | |
pass | |
def _init_bts(): | |
BTS.bts_init(BTS_TYPE) | |
if ENABLED: | |
FFI = cffi.FFI() | |
FFI.cdef(BTS_DECLARATIONS) | |
BTS = FFI.dlopen(SELF_DIR + "/libbts.so") | |
FFI.init_once(_init_bts, "init_bts") | |
def find_current_mappings(): | |
ret = [] | |
with open("/proc/self/maps", "r") as f: | |
for line in f: | |
m = re.match(r"^([0-9a-f]+)-([0-9a-f]+) r-xp .*", line) | |
if m: | |
mapping = (int(m.group(1), 16), int(m.group(2), 16)) | |
ret.append(mapping) | |
return ret | |
BASELINE_MAPPINGS = find_current_mappings() | |
def address_in_baseline_map(x): | |
for mapping in BASELINE_MAPPINGS: | |
if mapping[0] <= x < mapping[1]: | |
return True | |
return False | |
FULLY_SETUP = threading.local() | |
def teardown(): | |
if not ENABLED: | |
return | |
BTS.bts_teardown() | |
FULLY_SETUP.setup = False | |
def ensure_setup(buffer_size=DEFAULT_AUX_SIZE): | |
if not ENABLED or getattr(FULLY_SETUP, "setup", None): | |
return | |
assert BTS.bts_setup(buffer_size) == 0 | |
FULLY_SETUP.setup = True | |
EDGE_BUFFER = threading.local() | |
def reset(buffer_size=DEFAULT_AUX_SIZE): | |
if not ENABLED: | |
return | |
EDGE_BUFFER.buffer = [] | |
EDGE_BUFFER.call_count = 0 | |
assert BTS.bts_setup(buffer_size) == 0 | |
FULLY_SETUP.setup = True | |
MIN_ADDRESS = 2 ** 12 | |
MAX_ADDRESS = 2 ** 63 - 1 | |
ALL_SEEN_EDGES = dict() | |
USELESS_EDGES = set() | |
initial_time = time.time() | |
def hash_report(od_pairs): | |
global BEST_VALUES | |
"""Sketches the *unique* origin/destination pairs into an array of values""" | |
ret = list() | |
seen = set(USELESS_EDGES) | |
for pair in od_pairs: | |
# Skip kernel addresses. | |
if pair[0] > MAX_ADDRESS or pair[1] > MAX_ADDRESS: | |
continue | |
if pair[0] < MIN_ADDRESS or pair[1] < MIN_ADDRESS: | |
continue | |
if pair in seen: | |
continue | |
if address_in_baseline_map(pair[0]) or address_in_baseline_map(pair[1]): | |
continue | |
if pair not in ALL_SEEN_EDGES: | |
print( | |
"%f new edge %i %s" | |
% ( | |
time.time() - initial_time, | |
len(ALL_SEEN_EDGES), | |
(hex(pair[0]), hex(pair[1])), | |
) | |
) | |
ALL_SEEN_EDGES[pair] = len(ALL_SEEN_EDGES) | |
seen.add(pair) | |
ret.append(ALL_SEEN_EDGES[pair]) | |
return ret | |
def update_useless_edges(): | |
if not ENABLED: | |
return | |
num = FFI.new("ssize_t *") | |
ret = BTS.bts_stop(num) | |
for i in range(num[0]): | |
USELESS_EDGES.add((ret[i].from_addr, ret[i].to_addr)) | |
for pair in getattr(EDGE_BUFFER, "buffer", []): | |
USELESS_EDGES.add(pair) | |
def report(): | |
if not ENABLED: | |
return [] | |
num = FFI.new("ssize_t *") | |
ret = BTS.bts_stop(num) | |
od_pairs = [(ret[i].from_addr, ret[i].to_addr) for i in range(num[0])] + getattr( | |
EDGE_BUFFER, "buffer", [] | |
) | |
call_count = max(1, getattr(EDGE_BUFFER, "call_count", 0)) | |
return call_count, hash_report(od_pairs) | |
def _start(): | |
if not ENABLED or not getattr(FULLY_SETUP, "setup", None): | |
return | |
BTS.bts_start() | |
def _stop(): | |
if not ENABLED or not getattr(FULLY_SETUP, "setup", None): | |
return | |
if getattr(EDGE_BUFFER, "buffer", None) is None: | |
EDGE_BUFFER.buffer = [] | |
EDGE_BUFFER.call_count = 1 + getattr(EDGE_BUFFER, "call_count", 0) | |
num = FFI.new("ssize_t *") | |
ret = BTS.bts_stop(num) | |
for i in range(num[0]): | |
EDGE_BUFFER.buffer.append((ret[i].from_addr, ret[i].to_addr)) | |
def wrap_fn(fn): | |
if not ENABLED or not callable(fn): | |
return fn | |
@functools.wraps(fn) | |
def wrapper(*arg, **kwargs): | |
try: | |
_start() | |
return fn(*arg, **kwargs) | |
finally: | |
_stop() | |
return wrapper |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import faulthandler | |
import os | |
import re | |
from types import SimpleNamespace | |
import sanitizers | |
import cffi | |
SELF_DIR = os.path.dirname(os.path.abspath(__file__)) | |
TOPLEVEL = os.path.abspath(SELF_DIR + "/../../../../") + "/" | |
PREDEFINITIONS = """ | |
typedef ... regex_t; | |
/* | |
* While pycparser supports __int128, cffi does not. Provide these | |
* fake definitions for __[u]int128_t; they look the same to the ABI | |
* (in as much as there is an ABI for this extension). | |
*/ | |
typedef struct { | |
uint64_t lo; | |
uint64_t hi; | |
} __uint128_t; | |
typedef struct { | |
uint64_t lo; | |
int64_t hi; | |
} __int128_t; | |
""" | |
REPLACEMENTS = { | |
r"STAILQ_HEAD\s*\(\s*(\w*)\s*,\s*(\w+)\s*\)": r"struct \1 { struct \2 *stqh_first; struct \2 **stqh_last; }", | |
r"STAILQ_ENTRY\s*\(\s*(\w+)\s*\)": r"struct { struct \1 *stqe_next; }", | |
} | |
PUBLIC_HEADERS = [ | |
# ... | |
] | |
INTERNAL_HEADERS = [ | |
# ... | |
] | |
STRIPPED_PREFIXES = [ | |
"crdb_", | |
"CRDB_", | |
] | |
FFI = cffi.FFI() | |
def read_stripped_header(path): | |
"""Returns the contents of a header file without preprocessor directives.""" | |
ret = "" | |
in_directive = False | |
with open(path) as f: | |
for line in f: | |
if in_directive or re.match(r"^\s*#", line): | |
in_directive = line.endswith("\\\n") | |
else: | |
in_directive = False | |
for pattern, repl in REPLACEMENTS.items(): | |
line = re.sub(pattern, repl, line) | |
ret += line | |
return ret | |
FFI.cdef(PREDEFINITIONS) | |
for header in PUBLIC_HEADERS: | |
FFI.cdef(read_stripped_header(TOPLEVEL + "src/crdb/include/" + header)) | |
for header in INTERNAL_HEADERS: | |
FFI.cdef(read_stripped_header(TOPLEVEL + "src/crdb/include/crdb/" + header)) | |
if sanitizers.ENABLED: | |
C = FFI.dlopen(TOPLEVEL + "_build_asan/crdb/libcrdb.so") | |
else: | |
C = FFI.dlopen(TOPLEVEL + "_build/crdb/libcrdb.so") | |
def _strip_prefixes(lib): | |
"""Returns a namespace with every symbol in `lib` resolved eagerly, | |
and with aliases stripped of the `crdb_` prefix (and other entries | |
in STRIPPED_PREFIXES). | |
""" | |
ret = SimpleNamespace() | |
symbols = dir(lib) | |
for symbol in symbols: | |
# This is where we would wrap with BTS hooks. | |
fn = sanitizers.wrap_with_tracking(getattr(lib, symbol)) | |
setattr(ret, symbol, fn) | |
for prefix in STRIPPED_PREFIXES: | |
if symbol.startswith(prefix): | |
suffix = symbol[len(prefix) :] | |
assert ( | |
suffix not in symbols | |
), "Name collision when stripping %s from %s." % (prefix, symbol,) | |
setattr(ret, suffix, fn) | |
break | |
return ret | |
C = _strip_prefixes(C) | |
def _init_crdb(): | |
error = FFI.new("crdb_error_t *") | |
assert C.init(FFI.NULL, error), "crdb_init: %s %i" % ( | |
FFI.string(error.message), | |
error.error, | |
) | |
FFI.init_once(_init_crdb, "init_crdb") | |
# Pass in a copy of stderr in case anyone plays redirection tricks. | |
faulthandler.enable(os.dup(2)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Copyright 2020 Paul Khuong, Backtrace I/O, Inc. | |
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import faulthandler | |
import os | |
from types import SimpleNamespace | |
import bts | |
import sanitizers | |
import cffi | |
# We assume a libFuzzer entry point. | |
DECLARATION = "int LLVMFuzzerTestOneInput(const uint8_t *, size_t);" | |
assert ( | |
"FUZZ_TARGET" in os.environ | |
), "FUZZ_TARGET must point to a .so with a libFuzzer entry point" | |
FUZZ_TARGET = os.environ["FUZZ_TARGET"] | |
FFI = cffi.FFI() | |
FFI.cdef(DECLARATION) | |
C = FFI.dlopen(FUZZ_TARGET) | |
def _wrap_symbols(lib): | |
ret = SimpleNamespace() | |
symbols = dir(lib) | |
for symbol in symbols: | |
fn = sanitizers.wrap_with_tracking(bts.wrap_fn(getattr(lib, symbol))) | |
setattr(ret, symbol, fn) | |
return ret | |
C = _wrap_symbols(C) | |
# Pass in a copy of stderr in case anyone plays redirection tricks. | |
faulthandler.enable(os.dup(2)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
ASAN_OPTIONS=halt_on_error=1,leak_check_at_exit=0 LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libasan.so.5 FUZZ_TARGET=fuzzer-test-suite/openssl-1.0.1f-fsanitize.so pytest --capture=sys "$@" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Hypothesis integration with LSan / ASan | |
""" | |
import atexit | |
import contextlib | |
import functools | |
import os | |
import re | |
import tempfile | |
import unittest | |
import cffi | |
import hypothesis.stateful | |
__all__ = [ | |
"add_interesting_library", | |
"leaky_region", | |
"wrap_with_tracking", | |
"TestCase", | |
"RuleBasedStateMachine", | |
] | |
# When ASAN / LSAN is enabled, we have some extra instrumentation to | |
# assert against leaks, and locally disable leak detection. | |
# | |
# See https://github.com/gcc-mirror/gcc/blob/releases/gcc-6.3.0/libsanitizer/include/sanitizer/common_interface_defs.h | |
ASAN_DECLARATIONS = """ | |
void __sanitizer_set_report_path(const char *path); | |
void __asan_set_error_report_callback(void (*callback)(const char*)); | |
void __lsan_disable(void); | |
void __lsan_enable(void); | |
void __lsan_do_leak_check(void); | |
int __lsan_do_recoverable_leak_check(void); | |
""" | |
FFI = None | |
ASAN = None | |
# In order to load an ASAN library in a regular python binary, we must | |
# build the library with dynamic linkage to libasan (default on GCC, | |
# `-shared-libasan` for clang), and LD_PRELOAD libasan before invoking | |
# python. | |
# | |
# https://github.com/google/sanitizers/issues/918 | |
# | |
# In order to enable ASAN, ASAN_OPTIONS must be defined, and LD_PRELOAD | |
# should be setup; for example, | |
# | |
# ASAN_OPTIONS=halt_on_error=0,leak_check_at_exit=0, \ | |
# LD_PRELOAD=/usr/lib/gcc/x86_64-linux-gnu/6.3.0/libasan.so ... | |
# | |
# You want to disable the final check because it rudely exits whenever | |
# a leak is detected. We could call `do_leak_check` in an atexit hook | |
# while .so symbols are around, but that still exits the process... | |
ENABLED = "ASAN_OPTIONS" in os.environ and "LD_PRELOAD" in os.environ | |
if ENABLED: | |
FFI = cffi.FFI() | |
FFI.cdef(ASAN_DECLARATIONS) | |
ASAN = FFI.dlopen(None) | |
# This dictionary maps leak description to occurrence count. | |
# | |
# Leaks should be monotonically increasing, so this lets us | |
# detect when a new leak was introduced since the last call | |
# to _scan_for_interesting_leaks. | |
# | |
# A set should suffice, since LSan does error merging, but | |
# it doesn't hurt to be conservative here. | |
_KNOWN_LEAKS = dict() # leak -> counter | |
# LSan reports to stderr. | |
REPORT_FD = 2 | |
INTERESTING_LIBRARIES = set() | |
INTERESTING_LEAK_PATTERN = None | |
def add_interesting_library(library): | |
global INTERESTING_LEAK_PATTERN | |
INTERESTING_LIBRARIES.add(library) | |
any_lib_pattern = b"|".join( | |
b"(" + re.escape(bytes(lib, "utf-8")) + b")" | |
for lib in sorted(INTERESTING_LIBRARIES) | |
) | |
INTERESTING_LEAK_PATTERN = re.compile(r"^\s+#\d+.*" + any_lib_pattern) | |
def _scan_for_interesting_leaks(): | |
"""Scans for new leaks with interesting libraries in the backtrace.""" | |
if not ENABLED: | |
return [] | |
pattern = INTERESTING_LEAK_PATTERN | |
leaks = [] | |
with tempfile.TemporaryFile() as temp: | |
try: | |
saved_fd = os.dup(REPORT_FD) | |
os.dup2(temp.fileno(), REPORT_FD) | |
ASAN.__lsan_do_recoverable_leak_check() | |
finally: | |
os.dup2(saved_fd, REPORT_FD) | |
os.close(saved_fd) | |
known_leaks = dict(_KNOWN_LEAKS) | |
temp.seek(0) | |
acc = "" | |
must_dump = False | |
for line in temp: | |
line = str(line, "utf-8")[:-1] | |
# End of the report section. | |
if re.match(r"^\s*$", line): | |
if acc in known_leaks: | |
known_leaks[acc] -= 1 | |
if known_leaks[acc] == 0: | |
del known_leaks[acc] | |
else: | |
_KNOWN_LEAKS[acc] = 1 + _KNOWN_LEAKS.get(acc, 0) | |
if must_dump: | |
leaks.append(acc) | |
acc = "" | |
must_dump = False | |
else: | |
acc += line + "\n" | |
if pattern and re.match(pattern, line): | |
must_dump = True | |
return leaks | |
def reset_leaks(): | |
"""Updates the internal list of known leaks to take into account | |
everything that's already leaked.""" | |
if not ENABLED: | |
return | |
_scan_for_interesting_leaks() | |
def assert_no_leak(): | |
"""When ASan/LSan is enabled, asserts that no new interesting allocations | |
have been leaked since the last call to assert_no_leak. | |
""" | |
if not ENABLED: | |
return | |
leaks = _scan_for_interesting_leaks() | |
assert not leaks, "New leaks:\n%s" % "\n".join(leaks) | |
IN_LEAKY_REGION = False | |
@contextlib.contextmanager | |
def leaky_region(leaky=True): | |
"""Marks a dynamic extent as expected to leak. Use this as | |
with leaky_region(): | |
known | |
leaky | |
code | |
to disable allocation tracking in the with block. | |
""" | |
global IN_LEAKY_REGION | |
previous_state = IN_LEAKY_REGION | |
leaky = leaky and ENABLED | |
try: | |
if leaky: | |
ASAN.__lsan_disable() | |
IN_LEAKY_REGION = True | |
yield None | |
finally: | |
IN_LEAKY_REGION = previous_state | |
if leaky: | |
ASAN.__lsan_enable() | |
LAST_ASAN_ERROR = None | |
def _filter_lsan_errors(string): | |
acc = "" | |
active = False | |
for line in string.splitlines(keepends=False): | |
if re.match(r".*=+\d+=+ERROR: ", line): | |
active = not re.match(r".*=+ERROR: LeakSanitizer: ", line) | |
if active: | |
acc += line + "\n" | |
if re.match(r"^\W*=+\W*$", line): | |
active = False | |
return acc | |
def wrap_with_tracking(fn): | |
"""Wraps any callable value with a try/finally block to locally enable | |
allocation tracking. | |
If ASan is disabled or the value is not a function, this is the | |
identity. | |
""" | |
if not ENABLED or not callable(fn): | |
return fn | |
@functools.wraps(fn) | |
def wrapper(*args, **kwargs): | |
global LAST_ASAN_ERROR | |
check_leaks = not IN_LEAKY_REGION | |
try: | |
LAST_ASAN_ERROR = None | |
if check_leaks: | |
ASAN.__lsan_enable() | |
return fn(*args, **kwargs) | |
finally: | |
if check_leaks: | |
ASAN.__lsan_disable() | |
error = LAST_ASAN_ERROR | |
LAST_ASAN_ERROR = None | |
assert error is None, "ASan error: %s" % _filter_lsan_errors( | |
str(error, "utf-8") | |
) | |
return wrapper | |
class TestCase(unittest.TestCase): | |
def execute_example(self, f): | |
reset_leaks() | |
result = f() | |
assert_no_leak() | |
return result | |
class RuleBasedStateMachine(hypothesis.stateful.RuleBasedStateMachine): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
reset_leaks() | |
old_teardown = self.teardown | |
def new_teardown(): | |
old_teardown() | |
assert_no_leak() | |
# Try to minimise false leak alarms by only checking for leaks | |
# after teardown, i.e., when there should be no state left. | |
# Checking here also guarantees that no example leaks. | |
self.teardown = new_teardown | |
if ENABLED: | |
ASAN.__lsan_disable() | |
reset_leaks() | |
# Always check for leaks before shutdown. | |
atexit.register(assert_no_leak) | |
@FFI.callback("void(const char*)") | |
def asan_error_callback(message): | |
global LAST_ASAN_ERROR | |
# Save the first error in a callback: the rest might be | |
# blowback from that initial problem. | |
if LAST_ASAN_ERROR is None: | |
LAST_ASAN_ERROR = FFI.string(message) | |
ASAN.__asan_set_error_report_callback(asan_error_callback) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
import sys | |
from hypothesis import settings | |
from hypothesis.stateful import initialize, precondition, rule | |
import hypothesis.strategies as st | |
from llvm_one_input import C | |
import sanitizers | |
class GrammarTester(sanitizers.RuleBasedStateMachine): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
with sanitizers.leaky_region(): | |
assert C.LLVMFuzzerTestOneInput(b"", 0) == 0 | |
self.last_len = 0 | |
self.buf = b"" | |
def teardown(self): | |
print("CHECK") | |
sys.stdout.flush() | |
assert C.LLVMFuzzerTestOneInput(self.buf, len(self.buf)) == 0 | |
@initialize( | |
tls_ver=st.sampled_from([1, 2, 3]), | |
payload=st.binary(), | |
padding=st.binary(min_size=16), | |
) | |
def add_heartbeat(self, tls_ver, payload, padding): | |
hb_payload = bytes([0x01]) + struct.pack(">H", len(payload)) + payload + padding | |
self.buf += bytes([0x18, 0x03, tls_ver]) | |
self.buf += struct.pack(">H", len(hb_payload)) | |
self.buf += hb_payload | |
@precondition(lambda self: self.buf) | |
@rule(data=st.data(), value=st.integers(min_value=0, max_value=255)) | |
def replace_byte(self, data, value): | |
index = data.draw(st.integers(min_value=0, max_value=len(self.buf) - 1)) | |
prefix = self.buf[0:index] | |
suffix = self.buf[index + 1 :] | |
self.buf = prefix + bytes([value]) + suffix | |
@precondition(lambda self: self.buf) | |
@rule(data=st.data()) | |
def strip_suffix(self, data): | |
count = data.draw(st.integers(min_value=1, max_value=len(self.buf))) | |
self.buf = self.buf[0:-count] | |
@rule(suffix=st.binary(min_size=1)) | |
def add_suffix(self, suffix): | |
self.buf += suffix | |
TestWithGrammar = GrammarTester.TestCase | |
TestWithGrammar.settings = settings(max_examples=10000, deadline=None) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
import sys | |
from hypothesis import settings, target | |
from hypothesis.stateful import initialize, precondition, rule | |
import hypothesis.strategies as st | |
import bts | |
from llvm_one_input import C | |
import sanitizers | |
class GrammarBtsTester(sanitizers.RuleBasedStateMachine): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
bts.reset() | |
with sanitizers.leaky_region(): | |
assert C.LLVMFuzzerTestOneInput(b"", 0) == 0 | |
bts.update_useless_edges() | |
bts.reset() | |
self.last_len = 0 | |
self.buf = b"" | |
def teardown(self): | |
print("CHECK") | |
sys.stdout.flush() | |
assert C.LLVMFuzzerTestOneInput(self.buf, len(self.buf)) == 0 | |
for i in bts.report(): | |
target(1.0, str(i)) | |
bts.teardown() | |
@initialize( | |
tls_ver=st.sampled_from([1, 2, 3]), | |
payload=st.binary(), | |
padding=st.binary(min_size=16), | |
) | |
def add_heartbeat(self, tls_ver, payload, padding): | |
hb_payload = bytes([0x01]) + struct.pack(">H", len(payload)) + payload + padding | |
self.buf += bytes([0x18, 0x03, tls_ver]) | |
self.buf += struct.pack(">H", len(hb_payload)) | |
self.buf += hb_payload | |
@precondition(lambda self: self.buf) | |
@rule(data=st.data(), value=st.integers(min_value=0, max_value=255)) | |
def replace_byte(self, data, value): | |
index = data.draw(st.integers(min_value=0, max_value=len(self.buf) - 1)) | |
prefix = self.buf[0:index] | |
suffix = self.buf[index + 1 :] | |
self.buf = prefix + bytes([value]) + suffix | |
@precondition(lambda self: self.buf) | |
@rule(data=st.data()) | |
def strip_suffix(self, data): | |
count = data.draw(st.integers(min_value=1, max_value=len(self.buf))) | |
self.buf = self.buf[0:-count] | |
@rule(suffix=st.binary(min_size=1)) | |
def add_suffix(self, suffix): | |
self.buf += suffix | |
TestWithGrammarBts = GrammarBtsTester.TestCase | |
TestWithGrammarBts.settings = settings(max_examples=1000, deadline=None) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment