-
-
Save jubnzv/827f06a16e2127c1bfed17de0c139619 to your computer and use it in GitHub Desktop.
Triage script for Solidity fuzzing campaign: https://nowarp.io/blog/compiler-testing-part-1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Triage AFL++ crash inputs against the Solidity compiler fuzzer. | |
| Usage: | |
| ./triage.py [output_dir] [--binary PATH] [-j N] [--show-input] | |
| Defaults: | |
| output_dir: out/ | |
| binary: auto-detected per crash (build/harness_sol or build/harness_json) | |
| Automatically discovers crashes/ dirs under out/sol/ and out/json/ for all | |
| workers. Also works when pointed at a single crashes/ dir. | |
| Set HARNESS_VERBOSE=1 to see BUG: output during manual replay: | |
| HARNESS_VERBOSE=1 ./build/harness_sol < crash_file 2>&1 | |
| Features: | |
| - Persistent JSON cache (incremental re-runs) | |
| - Bug classification via reports/triage_state.json (TRIAGED/KNOWN/NEW) | |
| - Parallel reproduction with -j N | |
| - Progress bar with live counters | |
| - Bug normalisation for dedup | |
| - KeyboardInterrupt prints partial results | |
| - Confirmed crashes saved to <output>/confirmed/ with .bug/.loc sidecars | |
| ======================================================================= | |
| Reproducing via the public solc CLI (for upstream-facing reports) | |
| ======================================================================= | |
| For crashes found by harness_sol (raw .sol source input): | |
| # With optimiser (covers most bugs): | |
| solc --optimize --bin /path/to/crash_file | |
| # Without: | |
| solc --bin /path/to/crash_file | |
| # viaIR path: | |
| solc --optimize --via-ir --bin /path/to/crash_file | |
| For crashes found by harness_json (Standard JSON input): | |
| solc --standard-json < /path/to/crash_file | |
| The version of solc must match the submodule HEAD: | |
| cd third-party/solidity && git log --oneline -1 | |
| ======================================================================= | |
| Internal harness replay (never in upstream reports) | |
| ======================================================================= | |
| HARNESS_VERBOSE=1 ./build/harness_sol < crash_file 2>&1 | |
| HARNESS_VERBOSE=1 ./build/harness_json < crash_file 2>&1 | |
| """ | |
| import argparse | |
| import dataclasses | |
| import hashlib | |
| import json | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| from collections import defaultdict | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| # --------------------------------------------------------------------------- | |
| # Constants & regexes | |
| # --------------------------------------------------------------------------- | |
| DEFAULT_TIMEOUT = 30 | |
| # Printed by our harnesses when HARNESS_VERBOSE=1 | |
| # Captures everything from "BUG: " to end of output (boost throw info spans multiple lines) | |
| BUG_RE = re.compile(r"^BUG: (.+)$", re.MULTILINE) | |
| BUG_BLOCK_RE = re.compile(r"^BUG: (.+)", re.MULTILINE | re.DOTALL) | |
| # Fallback: C++ runtime "terminate called" message (solAssert / noexcept boundary) | |
| TERMINATE_RE = re.compile( | |
| r"terminate called after throwing[^\n]*\n\s*what\(\):\s*(.+)$", | |
| re.MULTILINE, | |
| ) | |
| # Compiler errors we don't care about (already filtered by harness C++ code, | |
| # but guard here too in case someone feeds crashes from outside our setup) | |
| BENIGN_RE = re.compile( | |
| r"UnimplementedFeatureError" | |
| r"|StackTooDeepError" | |
| r"|Stack too deep" | |
| r"|CompilerError:.*[Ss]tack too deep", | |
| re.MULTILINE, | |
| ) | |
| # Source roots to grep for the error message | |
| SOLIDITY_SRC_ROOTS = [ | |
| Path(__file__).parent / "third-party/solidity/libsolidity", | |
| Path(__file__).parent / "third-party/solidity/libyul", | |
| Path(__file__).parent / "third-party/solidity/libevmasm", | |
| Path(__file__).parent / "third-party/solidity/liblangutil", | |
| Path(__file__).parent / "third-party/solidity/libsolutil", | |
| Path(__file__).parent / "third-party/solidity/libsmtutil", | |
| Path(__file__).parent / "third-party/solidity/libsolc", | |
| ] | |
| _SRC_CACHE: dict = {} | |
| NORM_SUBS = [ | |
| # quoted identifiers / contract names | |
| (re.compile(r'"[^"]{1,80}"'), '"..."'), | |
| # source locations like "foo.sol:12:3" | |
| (re.compile(r'\b\w+\.sol:\d+:\d+'), '<loc>'), | |
| # plain numbers | |
| (re.compile(r'\b\d+\b'), 'N'), | |
| # hex values | |
| (re.compile(r'0x[0-9a-fA-F]+'), '0x...'), | |
| # collapse whitespace | |
| (re.compile(r'\s+'), ' '), | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Data types | |
| # --------------------------------------------------------------------------- | |
| @dataclasses.dataclass | |
| class CrashItem: | |
| key: str # sha256 of file contents | |
| path: Path | |
| rel_path: str | |
| binary: Path # harness to replay with | |
| @dataclasses.dataclass | |
| class TriageResult: | |
| key: str | |
| status: str # "reproduced" | "no_repro" | "timeout" | "benign" | |
| panic_msg: str | |
| location: str | |
| item: CrashItem | |
| # --------------------------------------------------------------------------- | |
| # Utilities | |
| # --------------------------------------------------------------------------- | |
| def normalize_bug(msg: str) -> str: | |
| if not msg: | |
| return "" | |
| norm = msg | |
| for pat, repl in NORM_SUBS: | |
| norm = pat.sub(repl, norm) | |
| return norm[:200] | |
| def file_hash(path: Path) -> str: | |
| return hashlib.sha256(path.read_bytes()).hexdigest()[:40] | |
| def progress_bar(current, total, reproduced, no_repro, timeouts, width=40): | |
| if total == 0: | |
| pct, filled = 100, width | |
| else: | |
| pct = current * 100 // total | |
| filled = current * width // total | |
| bar = "#" * filled + "-" * (width - filled) | |
| sys.stderr.write( | |
| f"\r[{bar}] {current}/{total} ({pct}%)" | |
| f" repro:{reproduced} no_repro:{no_repro} timeout:{timeouts} " | |
| ) | |
| sys.stderr.flush() | |
| def find_source_location(error_msg: str) -> str: | |
| """Extract throw location from the ICE message, falling back to source grep. | |
| Boost exception messages embed the actual throw site: | |
| /solidity/libsolidity/ast/Types.cpp(3753): Throw in function ... | |
| Parse that first before grepping. | |
| """ | |
| if error_msg in _SRC_CACHE: | |
| return _SRC_CACHE[error_msg] | |
| # Primary: boost throw location embedded in the message | |
| # e.g. "/solidity/libsolidity/ast/Types.cpp(3753): Throw in function ..." | |
| boost_loc = re.search(r'(/solidity/\S+\.cpp)\((\d+)\):\s*Throw in function\s+(\S+)', error_msg) | |
| if boost_loc: | |
| # Make the path relative to our repo root by stripping the /solidity/ prefix | |
| rel = "third-party/solidity" + boost_loc.group(1) | |
| result = f"{rel}:{boost_loc.group(2)} ({boost_loc.group(3)})" | |
| _SRC_CACHE[error_msg] = result | |
| return result | |
| # Fallback: grep the C++ source for the static error string | |
| needle = re.sub(r'^(InternalCompilerError|Exception):\s*', '', error_msg).strip() | |
| needle = re.split(r'["\d\n]', needle)[0].strip().rstrip('.,: ') | |
| if len(needle) < 8: | |
| _SRC_CACHE[error_msg] = "" | |
| return "" | |
| all_hits = [] | |
| for root in SOLIDITY_SRC_ROOTS: | |
| if not root.exists(): | |
| continue | |
| try: | |
| proc = subprocess.run( | |
| ["grep", "-rnF", "--include=*.cpp", "--include=*.h", needle, str(root)], | |
| capture_output=True, text=True, timeout=10, | |
| ) | |
| except Exception: | |
| continue | |
| for line in proc.stdout.splitlines(): | |
| m = re.match(r'^(.*?):(\d+):', line) | |
| if not m: | |
| continue | |
| try: | |
| rel = Path(m.group(1)).relative_to(Path(__file__).parent) | |
| except ValueError: | |
| rel = Path(m.group(1)) | |
| all_hits.append(f"{rel}:{m.group(2)}") | |
| if not all_hits: | |
| result = "" | |
| elif len(all_hits) == 1: | |
| result = all_hits[0] | |
| else: | |
| def score(h): | |
| p = h.rsplit(":", 1)[0] | |
| return ("test" in p, len(p)) | |
| all_hits.sort(key=score) | |
| result = f"{all_hits[0]} (+{len(all_hits)-1} more)" | |
| _SRC_CACHE[error_msg] = result | |
| return result | |
| # --------------------------------------------------------------------------- | |
| # Crash file discovery | |
| # --------------------------------------------------------------------------- | |
| def harness_from_path(crash_path: Path, default_binary: Path) -> Path: | |
| """Infer which harness to use from the crash path.""" | |
| parts = [p.lower() for p in crash_path.parts] | |
| if any("json" in p for p in parts): | |
| candidate = Path(__file__).parent / "build/harness_json" | |
| if candidate.exists(): | |
| return candidate | |
| candidate = Path(__file__).parent / "build/harness_sol" | |
| if candidate.exists(): | |
| return candidate | |
| return default_binary | |
| def collect_crash_files(output_dir: Path): | |
| if output_dir.name == "crashes" or output_dir.name.startswith("crashes."): | |
| crash_dirs = [output_dir] | |
| else: | |
| crash_dirs = sorted( | |
| d for d in output_dir.rglob("crashes*") | |
| if d.is_dir() and (d.name == "crashes" or d.name.startswith("crashes.")) | |
| ) | |
| files = [] | |
| for cd in crash_dirs: | |
| for f in cd.iterdir(): | |
| if f.is_file() and f.name.startswith("id:"): | |
| files.append(f) | |
| return sorted(files), crash_dirs | |
| # --------------------------------------------------------------------------- | |
| # Reproduction | |
| # --------------------------------------------------------------------------- | |
| def reproduce_crash(item: CrashItem, timeout: int) -> TriageResult: | |
| env = os.environ.copy() | |
| env["HARNESS_VERBOSE"] = "1" | |
| # Pass AFL_MAP_SIZE so the binary doesn't complain | |
| env.setdefault("AFL_MAP_SIZE", "10000000") | |
| try: | |
| with open(item.path, "rb") as fh: | |
| result = subprocess.run( | |
| [str(item.binary)], | |
| stdin=fh, | |
| capture_output=True, | |
| timeout=timeout, | |
| env=env, | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return TriageResult(key=item.key, status="timeout", | |
| panic_msg="TIMEOUT", location="timeout", item=item) | |
| except Exception as e: | |
| return TriageResult(key=item.key, status="no_repro", | |
| panic_msg=str(e), location="error", item=item) | |
| output = (result.stderr + result.stdout).decode(errors="replace") | |
| if BENIGN_RE.search(output): | |
| return TriageResult(key=item.key, status="benign", | |
| panic_msg="", location="", item=item) | |
| bug_match = BUG_RE.search(output) | |
| if bug_match: | |
| panic_msg = bug_match.group(1).strip() # first line only — for display/dedup | |
| bug_block = BUG_BLOCK_RE.search(output) | |
| full_bug = bug_block.group(1) if bug_block else panic_msg | |
| location = find_source_location(full_bug) | |
| return TriageResult(key=item.key, status="reproduced", | |
| panic_msg=panic_msg, location=location, item=item) | |
| # Fallback: C++ terminate / solAssert path (no BUG: marker) | |
| term_match = TERMINATE_RE.search(output) | |
| if term_match and result.returncode not in (0, 1): | |
| panic_msg = f"terminate: {term_match.group(1).strip()}" | |
| location = find_source_location(panic_msg) | |
| return TriageResult(key=item.key, status="reproduced", | |
| panic_msg=panic_msg, location=location, item=item) | |
| # No BUG: marker but non-zero exit (native crash / ASAN abort without message) | |
| if result.returncode not in (0, 1): | |
| # Python reports signal deaths as -signum; shell/waitpid uses 128+signum | |
| if result.returncode < 0: | |
| sig = -result.returncode | |
| elif result.returncode > 128: | |
| sig = result.returncode - 128 | |
| else: | |
| sig = result.returncode | |
| panic_msg = f"signal {sig} (exit {result.returncode})" | |
| return TriageResult(key=item.key, status="reproduced", | |
| panic_msg=panic_msg, location="unknown", item=item) | |
| return TriageResult(key=item.key, status="no_repro", | |
| panic_msg="", location="", item=item) | |
| # --------------------------------------------------------------------------- | |
| # State / cache I/O | |
| # --------------------------------------------------------------------------- | |
| def state_key(location, panic_msg): | |
| return f"{location}||{panic_msg}" | |
| def load_triage_state(path): | |
| p = Path(path) | |
| return json.loads(p.read_text()) if p.exists() else {} | |
| def load_cache(path: Path) -> dict: | |
| return json.loads(path.read_text()) if path.exists() else {} | |
| def save_cache(path: Path, cache: dict): | |
| path.write_text(json.dumps(cache)) | |
| def save_confirmed(result: TriageResult, confirmed_dir: Path): | |
| dest = confirmed_dir / result.item.path.name | |
| if not dest.exists(): | |
| shutil.copy2(result.item.path, dest) | |
| Path(str(dest) + ".bug").write_text(result.panic_msg + "\n") | |
| if result.location and result.location != "unknown": | |
| Path(str(dest) + ".loc").write_text(result.location + "\n") | |
| # --------------------------------------------------------------------------- | |
| # Triage engine | |
| # --------------------------------------------------------------------------- | |
| def run_triage(pending, timeout, jobs, cache, confirmed_dir): | |
| total = len(pending) | |
| stats = {"checked": 0, "reproduced": 0, "no_repro": 0, "timeout": 0, "benign": 0} | |
| def handle(r: TriageResult): | |
| stats["checked"] += 1 | |
| if r.status == "reproduced": | |
| stats["reproduced"] += 1 | |
| save_confirmed(r, confirmed_dir) | |
| cache[r.key] = [True, r.panic_msg, r.location] | |
| elif r.status == "timeout": | |
| stats["timeout"] += 1 | |
| cache[r.key] = [True, r.panic_msg, r.location] | |
| elif r.status == "benign": | |
| stats["benign"] += 1 | |
| cache[r.key] = [False, None, None] | |
| else: | |
| stats["no_repro"] += 1 | |
| cache[r.key] = [False, None, None] | |
| progress_bar(stats["checked"], total, | |
| stats["reproduced"], stats["no_repro"] + stats["benign"], stats["timeout"]) | |
| interrupted = False | |
| try: | |
| if jobs <= 1: | |
| for item in pending: | |
| handle(reproduce_crash(item, timeout)) | |
| else: | |
| with ThreadPoolExecutor(max_workers=jobs) as ex: | |
| futures = {ex.submit(reproduce_crash, item, timeout): item for item in pending} | |
| for f in as_completed(futures): | |
| handle(f.result()) | |
| except KeyboardInterrupt: | |
| interrupted = True | |
| sys.stderr.write("\n\n*** INTERRUPTED — partial results below ***\n\n") | |
| sys.stderr.write("\r" + " " * 100 + "\r") | |
| sys.stderr.flush() | |
| return interrupted, stats | |
| # --------------------------------------------------------------------------- | |
| # Argument parsing | |
| # --------------------------------------------------------------------------- | |
| def parse_args(): | |
| p = argparse.ArgumentParser(description="Triage Solidity fuzzer crashes") | |
| p.add_argument("output_dir", nargs="?", default="out/", | |
| help="AFL output dir (scans all crashes/ subdirs) or single crashes/ dir") | |
| p.add_argument("--binary", default=None, | |
| help="Override harness binary (default: auto-detected per crash)") | |
| p.add_argument("--show-input", action="store_true", | |
| help="Print first 200 chars of crash input") | |
| p.add_argument("--state", default="reports/triage_state.json", | |
| help="Triage state file (default: reports/triage_state.json)") | |
| p.add_argument("-j", "--jobs", type=int, default=1, | |
| help="Parallel reproduction workers (default: 1)") | |
| p.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, | |
| help=f"Per-crash timeout in seconds (default: {DEFAULT_TIMEOUT})") | |
| return p.parse_args() | |
| # --------------------------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| args = parse_args() | |
| output_dir = Path(args.output_dir) | |
| default_binary = Path(args.binary) if args.binary else Path("build/harness_sol") | |
| if not output_dir.exists(): | |
| print(f"Output dir not found: {output_dir}", file=sys.stderr) | |
| sys.exit(1) | |
| triage_state = load_triage_state(args.state) | |
| crash_files, crash_dirs = collect_crash_files(output_dir) | |
| if not crash_dirs: | |
| print(f"No crashes/ dirs found under {output_dir}", file=sys.stderr) | |
| sys.exit(1) | |
| if not crash_files: | |
| print("No crash files found.") | |
| sys.exit(0) | |
| cache_path = output_dir / "triage_cache.json" | |
| cache = load_cache(cache_path) | |
| confirmed_dir = output_dir / "confirmed" | |
| confirmed_dir.mkdir(parents=True, exist_ok=True) | |
| file_hashes: dict = {} | |
| pending: list = [] | |
| for cf in crash_files: | |
| h = file_hash(cf) | |
| file_hashes[cf] = h | |
| if h not in cache: | |
| binary = harness_from_path(cf, default_binary) | |
| try: | |
| rel = str(cf.relative_to(output_dir)) | |
| except ValueError: | |
| rel = str(cf) | |
| pending.append(CrashItem(key=h, path=cf, rel_path=rel, binary=binary)) | |
| cached_count = len(crash_files) - len(pending) | |
| print(f"Found {len(crash_files)} crashes from {len(crash_dirs)} instance(s)") | |
| for cd in crash_dirs: | |
| n = sum(1 for f in cd.iterdir() if f.is_file() and f.name.startswith("id:")) | |
| try: | |
| rel = cd.relative_to(output_dir) | |
| except ValueError: | |
| rel = cd | |
| print(f" {rel}: {n} crashes") | |
| if cached_count: | |
| print(f"Cached: {cached_count}, new: {len(pending)}") | |
| if args.jobs > 1: | |
| print(f"Workers: {args.jobs}") | |
| print() | |
| interrupted = False | |
| if pending: | |
| interrupted, _ = run_triage(pending, args.timeout, args.jobs, cache, confirmed_dir) | |
| save_cache(cache_path, cache) | |
| # Collect all results for reporting | |
| bugs = [] | |
| filtered = 0 | |
| for cf in crash_files: | |
| h = file_hashes[cf] | |
| if h not in cache: | |
| continue | |
| is_bug, panic_msg, location = cache[h][0], cache[h][1], cache[h][2] | |
| if is_bug: | |
| bugs.append((cf, panic_msg or "", location or "unknown")) | |
| else: | |
| filtered += 1 | |
| # Dedup by (location, normalised panic) | |
| by_location: dict = defaultdict(list) | |
| for cf, panic_msg, location in bugs: | |
| norm = normalize_bug(panic_msg) | |
| by_location[(location, norm)].append((cf, panic_msg)) | |
| new_hashes = {item.key for item in pending} | |
| triaged, known, new = [], [], [] | |
| for norm_key, files in sorted(by_location.items(), key=lambda x: -len(x[1])): | |
| location, norm_panic = norm_key | |
| rep_panic = files[0][1] | |
| sk = state_key(location, rep_panic) | |
| entry = triage_state.get(sk) or triage_state.get(state_key(location, norm_panic)) | |
| is_new = any(file_hashes[cf] in new_hashes for cf, _ in files) | |
| if entry: | |
| triaged.append((norm_key, files, entry)) | |
| elif is_new: | |
| new.append((norm_key, files)) | |
| else: | |
| known.append((norm_key, files)) | |
| print("=" * 72) | |
| print( | |
| f"RESULTS: {len(bugs)} bugs, {filtered} filtered, " | |
| f"{len(by_location)} unique locations " | |
| f"({len(triaged)} triaged, {len(known)} known, {len(new)} new)" | |
| ) | |
| print("=" * 72) | |
| print() | |
| if not bugs: | |
| print("No real bugs found (all crashes filtered).") | |
| if interrupted: | |
| sys.exit(130) | |
| return | |
| def print_bug(idx, location, panic_msg, files, tag=""): | |
| print(f"[{idx}] {location}{tag}") | |
| src = find_source_location(panic_msg) | |
| if src: | |
| print(f" source: {src}") | |
| print(f" panic: {panic_msg}") | |
| print(f" count: {len(files)}") | |
| first_cf = files[0][0] | |
| try: | |
| rel = first_cf.relative_to(output_dir) | |
| except ValueError: | |
| rel = first_cf | |
| print(f" first: {rel}") | |
| if args.show_input: | |
| print(f" input: {first_cf.read_bytes()[:200]!r}") | |
| print() | |
| idx = 1 | |
| if triaged: | |
| print("--- TRIAGED " + "-" * 60) | |
| print() | |
| for (location, _), files, entry in triaged: | |
| print_bug(idx, location, files[0][1], files, f" [{entry['status']}]") | |
| idx += 1 | |
| if known: | |
| print("--- KNOWN (untriaged) " + "-" * 51) | |
| print() | |
| for (location, _), files in known: | |
| print_bug(idx, location, files[0][1], files) | |
| idx += 1 | |
| if new: | |
| print("--- NEW " + "-" * 64) | |
| print() | |
| for (location, _), files in new: | |
| print_bug(idx, location, files[0][1], files) | |
| idx += 1 | |
| if interrupted: | |
| sys.exit(130) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment