Skip to content

Instantly share code, notes, and snippets.

@andreafioraldi
Last active May 13, 2020 12:23
Show Gist options
  • Save andreafioraldi/d2c21e14a2b4a49a013d2e691b25cb1d to your computer and use it in GitHub Desktop.
Save andreafioraldi/d2c21e14a2b4a49a013d2e691b25cb1d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
'''
Copyright (c) 2020, Andrea Fioraldi
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
import subprocess
import progressbar
import argparse
import shutil
import sys
import os
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
DESCR = """UBSan minimize set of crashing testcases
Copyright (C) 2020 Andrea Fioraldi <andreafioraldi@gmail.com>
"""
dir_path = os.path.dirname(os.path.realpath(__file__))
opt = argparse.ArgumentParser(description=DESCR, formatter_class=argparse.RawTextHelpFormatter)
opt.add_argument("-i", help="Input directory with crashes", action='store', required=True)
opt.add_argument("-q", help="Be quiet", action='store_true')
opt.add_argument("-t", help="Sort by time not by size", action='store_true')
opt.add_argument("-s", help="Show program output runned with the minimized set", action='store_true')
opt.add_argument('target', nargs=argparse.REMAINDER, help="Target program (and arguments)")
args = opt.parse_args()
be_quiet = args.q
def get_testcase_time(path):
if "time:" in path:
p = path[path.find("time:")+5:]
t = ""
for c in p:
if c.isdigit(): t += c
else: break
return int(t)
stat = os.stat(path)
try:
return stat.st_birthtime
except AttributeError:
return stat.st_mtime
def run(argv, stdin_file=None, print_output=False):
if stdin_file:
with open(stdin_file, "rb") as f:
content = f.read()
if print_output:
p = subprocess.Popen(argv, stdin=subprocess.PIPE, close_fds=True)
if stdin_file:
p.stdin.write(content)
o = None
else:
p = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
if stdin_file:
p.stdin.write(content)
o, _ = p.communicate()
p.wait()
return o
def warn(x):
global be_quiet
if not be_quiet:
sys.stdout.flush()
print("\n" + WARNING + "[warning] " + x + ENDC)
def log_progress(x):
global be_quiet
if be_quiet:
return x
else:
return progressbar.progressbar(x)
ubsan_bugs = {}
asan_bugs = {}
class UbsanCrash(object):
def __init__(self, type_, loc, path, size):
self.type = type_
self.loc = loc
self.path = path
self.size = size
self.time = get_testcase_time(path)
def __hash__(self):
return hash(self.loc)
def __eq__(self, o):
return self.loc == o.loc
class AsanCrash(object):
def __init__(self, type_, trace, loc, path, size):
self.type = type_
self.trace = trace
self.size = size
self.loc = loc
self.path = path
self.time = get_testcase_time(path)
def __hash__(self):
return hash(tuple(self.trace))
def __eq__(self, o):
return self.trace == o.trace
def hash_callstack(s):
h = 0
for a in s:
h ^= a
return h
dirpath = args.i
for fname in log_progress(os.listdir(dirpath)):
path = os.path.join(dirpath, fname)
size = os.path.getsize(path)
argv = args.target[:]
stdin_file = path
for i in range(len(argv)):
if argv[i] == "@@":
argv[i] = path
stdin_file = None
out = run(argv, stdin_file)
lines = out.split(b"\n")
errs = set()
one_found = False
start_asan = False
in_trace = True
stacktrace = []
first_st = ""
asan_type = ""
for l in lines:
if l.startswith(b"================================================================="):
start_asan = True
elif start_asan and b"ERROR: AddressSanitizer:" in l:
l = l[l.find(b"ERROR: AddressSanitizer: ") + len(b"ERROR: AddressSanitizer: "):]
asan_type = l.decode("utf-8")
elif (start_asan or in_trace) and l.startswith(b" #"):
in_trace = True
start_asan = False
l = l.split()
if l[0] == b"#0":
first_st = (b" ".join(l[1:])).decode("utf-8")
stacktrace.append(int(l[1], 0))
elif in_trace:
in_trace = False
elif b": runtime error: " in l:
l = l.split(b": runtime error: ")
type_ = l[1].decode("utf-8")
loc = l[0].decode("utf-8")
errs.add(UbsanCrash(type_, loc, path, size))
one_found = True
if not one_found and len(stacktrace) == 0:
warn(path + " does not trigger any violation!")
if len(stacktrace) > 0:
st = hash_callstack(stacktrace)
asan_bugs[st] = asan_bugs.get(st, []) + [AsanCrash(asan_type, stacktrace, first_st, path, size)]
for c in errs:
ubsan_bugs[c.loc] = ubsan_bugs.get(c.loc, []) + [c]
term_w = 0
for s in ubsan_bugs:
if args.t:
ubsan_bugs[s] = sorted(ubsan_bugs[s], key=lambda x: x.time)
else:
ubsan_bugs[s] = sorted(ubsan_bugs[s], key=lambda x: x.size)
if term_w == 0:
term_w, _ = shutil.get_terminal_size((19 + max(len(ubsan_bugs[s][0].path), len(s)), 20))
print(HEADER + "=" * term_w + ENDC)
print(BOLD + " Error type : " + ubsan_bugs[s][0].type + ENDC)
print(BOLD + " Error location : " + s + ENDC)
print(BOLD + " Testcase path : " + ubsan_bugs[s][0].path + ENDC)
print(BOLD + " Testcase size : " + str(ubsan_bugs[s][0].size) + ENDC)
print(HEADER + "=" * term_w + ENDC)
argv = args.target[:]
stdin_file = ubsan_bugs[s][0].path
for i in range(len(argv)):
if argv[i] == "@@":
argv[i] = ubsan_bugs[s][0].path
stdin_file = None
run(argv, stdin_file, args.s)
for s in asan_bugs:
if args.t:
asan_bugs[s] = sorted(asan_bugs[s], key=lambda x: x.time)
else:
asan_bugs[s] = sorted(asan_bugs[s], key=lambda x: x.size)
if term_w == 0:
term_w, _ = shutil.get_terminal_size((19 + max(len(asan_bugs[s][0].path), len(asan_bugs[s][0].loc), len(asan_bugs[s][0].type)), 20))
print(HEADER + "=" * term_w + ENDC)
print(BOLD + " Error type : " + asan_bugs[s][0].type + ENDC)
print(BOLD + " Error location : " + asan_bugs[s][0].loc + ENDC)
print(BOLD + " Testcase path : " + asan_bugs[s][0].path + ENDC)
print(BOLD + " Testcase size : " + str(asan_bugs[s][0].size) + ENDC)
print(HEADER + "=" * term_w + ENDC)
argv = args.target[:]
stdin_file = asan_bugs[s][0].path
for i in range(len(argv)):
if argv[i] == "@@":
argv[i] = asan_bugs[s][0].path
stdin_file = None
run(argv, stdin_file, args.s)
print ("Unique UBSan violations :", len(ubsan_bugs))
print ("Unique ASan violations :", len(asan_bugs))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment