Skip to content

Instantly share code, notes, and snippets.

@jubnzv

jubnzv/triage.py Secret

Created April 23, 2026 05:24
Show Gist options
  • Select an option

  • Save jubnzv/827f06a16e2127c1bfed17de0c139619 to your computer and use it in GitHub Desktop.

Select an option

Save jubnzv/827f06a16e2127c1bfed17de0c139619 to your computer and use it in GitHub Desktop.
Triage script for Solidity fuzzing campaign: https://nowarp.io/blog/compiler-testing-part-1
#!/usr/bin/env python3
"""Triage AFL++ crash inputs against the Solidity compiler fuzzer.
Usage:
./triage.py [output_dir] [--binary PATH] [-j N] [--show-input]
Defaults:
output_dir: out/
binary: auto-detected per crash (build/harness_sol or build/harness_json)
Automatically discovers crashes/ dirs under out/sol/ and out/json/ for all
workers. Also works when pointed at a single crashes/ dir.
Set HARNESS_VERBOSE=1 to see BUG: output during manual replay:
HARNESS_VERBOSE=1 ./build/harness_sol < crash_file 2>&1
Features:
- Persistent JSON cache (incremental re-runs)
- Bug classification via reports/triage_state.json (TRIAGED/KNOWN/NEW)
- Parallel reproduction with -j N
- Progress bar with live counters
- Bug normalisation for dedup
- KeyboardInterrupt prints partial results
- Confirmed crashes saved to <output>/confirmed/ with .bug/.loc sidecars
=======================================================================
Reproducing via the public solc CLI (for upstream-facing reports)
=======================================================================
For crashes found by harness_sol (raw .sol source input):
# With optimiser (covers most bugs):
solc --optimize --bin /path/to/crash_file
# Without:
solc --bin /path/to/crash_file
# viaIR path:
solc --optimize --via-ir --bin /path/to/crash_file
For crashes found by harness_json (Standard JSON input):
solc --standard-json < /path/to/crash_file
The version of solc must match the submodule HEAD:
cd third-party/solidity && git log --oneline -1
=======================================================================
Internal harness replay (never in upstream reports)
=======================================================================
HARNESS_VERBOSE=1 ./build/harness_sol < crash_file 2>&1
HARNESS_VERBOSE=1 ./build/harness_json < crash_file 2>&1
"""
import argparse
import dataclasses
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
# ---------------------------------------------------------------------------
# Constants & regexes
# ---------------------------------------------------------------------------
DEFAULT_TIMEOUT = 30
# Printed by our harnesses when HARNESS_VERBOSE=1
# Captures everything from "BUG: " to end of output (boost throw info spans multiple lines)
BUG_RE = re.compile(r"^BUG: (.+)$", re.MULTILINE)
BUG_BLOCK_RE = re.compile(r"^BUG: (.+)", re.MULTILINE | re.DOTALL)
# Fallback: C++ runtime "terminate called" message (solAssert / noexcept boundary)
TERMINATE_RE = re.compile(
r"terminate called after throwing[^\n]*\n\s*what\(\):\s*(.+)$",
re.MULTILINE,
)
# Compiler errors we don't care about (already filtered by harness C++ code,
# but guard here too in case someone feeds crashes from outside our setup)
BENIGN_RE = re.compile(
r"UnimplementedFeatureError"
r"|StackTooDeepError"
r"|Stack too deep"
r"|CompilerError:.*[Ss]tack too deep",
re.MULTILINE,
)
# Source roots to grep for the error message
SOLIDITY_SRC_ROOTS = [
Path(__file__).parent / "third-party/solidity/libsolidity",
Path(__file__).parent / "third-party/solidity/libyul",
Path(__file__).parent / "third-party/solidity/libevmasm",
Path(__file__).parent / "third-party/solidity/liblangutil",
Path(__file__).parent / "third-party/solidity/libsolutil",
Path(__file__).parent / "third-party/solidity/libsmtutil",
Path(__file__).parent / "third-party/solidity/libsolc",
]
_SRC_CACHE: dict = {}
NORM_SUBS = [
# quoted identifiers / contract names
(re.compile(r'"[^"]{1,80}"'), '"..."'),
# source locations like "foo.sol:12:3"
(re.compile(r'\b\w+\.sol:\d+:\d+'), '<loc>'),
# plain numbers
(re.compile(r'\b\d+\b'), 'N'),
# hex values
(re.compile(r'0x[0-9a-fA-F]+'), '0x...'),
# collapse whitespace
(re.compile(r'\s+'), ' '),
]
# ---------------------------------------------------------------------------
# Data types
# ---------------------------------------------------------------------------
@dataclasses.dataclass
class CrashItem:
key: str # sha256 of file contents
path: Path
rel_path: str
binary: Path # harness to replay with
@dataclasses.dataclass
class TriageResult:
key: str
status: str # "reproduced" | "no_repro" | "timeout" | "benign"
panic_msg: str
location: str
item: CrashItem
# ---------------------------------------------------------------------------
# Utilities
# ---------------------------------------------------------------------------
def normalize_bug(msg: str) -> str:
if not msg:
return ""
norm = msg
for pat, repl in NORM_SUBS:
norm = pat.sub(repl, norm)
return norm[:200]
def file_hash(path: Path) -> str:
return hashlib.sha256(path.read_bytes()).hexdigest()[:40]
def progress_bar(current, total, reproduced, no_repro, timeouts, width=40):
if total == 0:
pct, filled = 100, width
else:
pct = current * 100 // total
filled = current * width // total
bar = "#" * filled + "-" * (width - filled)
sys.stderr.write(
f"\r[{bar}] {current}/{total} ({pct}%)"
f" repro:{reproduced} no_repro:{no_repro} timeout:{timeouts} "
)
sys.stderr.flush()
def find_source_location(error_msg: str) -> str:
"""Extract throw location from the ICE message, falling back to source grep.
Boost exception messages embed the actual throw site:
/solidity/libsolidity/ast/Types.cpp(3753): Throw in function ...
Parse that first before grepping.
"""
if error_msg in _SRC_CACHE:
return _SRC_CACHE[error_msg]
# Primary: boost throw location embedded in the message
# e.g. "/solidity/libsolidity/ast/Types.cpp(3753): Throw in function ..."
boost_loc = re.search(r'(/solidity/\S+\.cpp)\((\d+)\):\s*Throw in function\s+(\S+)', error_msg)
if boost_loc:
# Make the path relative to our repo root by stripping the /solidity/ prefix
rel = "third-party/solidity" + boost_loc.group(1)
result = f"{rel}:{boost_loc.group(2)} ({boost_loc.group(3)})"
_SRC_CACHE[error_msg] = result
return result
# Fallback: grep the C++ source for the static error string
needle = re.sub(r'^(InternalCompilerError|Exception):\s*', '', error_msg).strip()
needle = re.split(r'["\d\n]', needle)[0].strip().rstrip('.,: ')
if len(needle) < 8:
_SRC_CACHE[error_msg] = ""
return ""
all_hits = []
for root in SOLIDITY_SRC_ROOTS:
if not root.exists():
continue
try:
proc = subprocess.run(
["grep", "-rnF", "--include=*.cpp", "--include=*.h", needle, str(root)],
capture_output=True, text=True, timeout=10,
)
except Exception:
continue
for line in proc.stdout.splitlines():
m = re.match(r'^(.*?):(\d+):', line)
if not m:
continue
try:
rel = Path(m.group(1)).relative_to(Path(__file__).parent)
except ValueError:
rel = Path(m.group(1))
all_hits.append(f"{rel}:{m.group(2)}")
if not all_hits:
result = ""
elif len(all_hits) == 1:
result = all_hits[0]
else:
def score(h):
p = h.rsplit(":", 1)[0]
return ("test" in p, len(p))
all_hits.sort(key=score)
result = f"{all_hits[0]} (+{len(all_hits)-1} more)"
_SRC_CACHE[error_msg] = result
return result
# ---------------------------------------------------------------------------
# Crash file discovery
# ---------------------------------------------------------------------------
def harness_from_path(crash_path: Path, default_binary: Path) -> Path:
"""Infer which harness to use from the crash path."""
parts = [p.lower() for p in crash_path.parts]
if any("json" in p for p in parts):
candidate = Path(__file__).parent / "build/harness_json"
if candidate.exists():
return candidate
candidate = Path(__file__).parent / "build/harness_sol"
if candidate.exists():
return candidate
return default_binary
def collect_crash_files(output_dir: Path):
if output_dir.name == "crashes" or output_dir.name.startswith("crashes."):
crash_dirs = [output_dir]
else:
crash_dirs = sorted(
d for d in output_dir.rglob("crashes*")
if d.is_dir() and (d.name == "crashes" or d.name.startswith("crashes."))
)
files = []
for cd in crash_dirs:
for f in cd.iterdir():
if f.is_file() and f.name.startswith("id:"):
files.append(f)
return sorted(files), crash_dirs
# ---------------------------------------------------------------------------
# Reproduction
# ---------------------------------------------------------------------------
def reproduce_crash(item: CrashItem, timeout: int) -> TriageResult:
env = os.environ.copy()
env["HARNESS_VERBOSE"] = "1"
# Pass AFL_MAP_SIZE so the binary doesn't complain
env.setdefault("AFL_MAP_SIZE", "10000000")
try:
with open(item.path, "rb") as fh:
result = subprocess.run(
[str(item.binary)],
stdin=fh,
capture_output=True,
timeout=timeout,
env=env,
)
except subprocess.TimeoutExpired:
return TriageResult(key=item.key, status="timeout",
panic_msg="TIMEOUT", location="timeout", item=item)
except Exception as e:
return TriageResult(key=item.key, status="no_repro",
panic_msg=str(e), location="error", item=item)
output = (result.stderr + result.stdout).decode(errors="replace")
if BENIGN_RE.search(output):
return TriageResult(key=item.key, status="benign",
panic_msg="", location="", item=item)
bug_match = BUG_RE.search(output)
if bug_match:
panic_msg = bug_match.group(1).strip() # first line only — for display/dedup
bug_block = BUG_BLOCK_RE.search(output)
full_bug = bug_block.group(1) if bug_block else panic_msg
location = find_source_location(full_bug)
return TriageResult(key=item.key, status="reproduced",
panic_msg=panic_msg, location=location, item=item)
# Fallback: C++ terminate / solAssert path (no BUG: marker)
term_match = TERMINATE_RE.search(output)
if term_match and result.returncode not in (0, 1):
panic_msg = f"terminate: {term_match.group(1).strip()}"
location = find_source_location(panic_msg)
return TriageResult(key=item.key, status="reproduced",
panic_msg=panic_msg, location=location, item=item)
# No BUG: marker but non-zero exit (native crash / ASAN abort without message)
if result.returncode not in (0, 1):
# Python reports signal deaths as -signum; shell/waitpid uses 128+signum
if result.returncode < 0:
sig = -result.returncode
elif result.returncode > 128:
sig = result.returncode - 128
else:
sig = result.returncode
panic_msg = f"signal {sig} (exit {result.returncode})"
return TriageResult(key=item.key, status="reproduced",
panic_msg=panic_msg, location="unknown", item=item)
return TriageResult(key=item.key, status="no_repro",
panic_msg="", location="", item=item)
# ---------------------------------------------------------------------------
# State / cache I/O
# ---------------------------------------------------------------------------
def state_key(location, panic_msg):
return f"{location}||{panic_msg}"
def load_triage_state(path):
p = Path(path)
return json.loads(p.read_text()) if p.exists() else {}
def load_cache(path: Path) -> dict:
return json.loads(path.read_text()) if path.exists() else {}
def save_cache(path: Path, cache: dict):
path.write_text(json.dumps(cache))
def save_confirmed(result: TriageResult, confirmed_dir: Path):
dest = confirmed_dir / result.item.path.name
if not dest.exists():
shutil.copy2(result.item.path, dest)
Path(str(dest) + ".bug").write_text(result.panic_msg + "\n")
if result.location and result.location != "unknown":
Path(str(dest) + ".loc").write_text(result.location + "\n")
# ---------------------------------------------------------------------------
# Triage engine
# ---------------------------------------------------------------------------
def run_triage(pending, timeout, jobs, cache, confirmed_dir):
total = len(pending)
stats = {"checked": 0, "reproduced": 0, "no_repro": 0, "timeout": 0, "benign": 0}
def handle(r: TriageResult):
stats["checked"] += 1
if r.status == "reproduced":
stats["reproduced"] += 1
save_confirmed(r, confirmed_dir)
cache[r.key] = [True, r.panic_msg, r.location]
elif r.status == "timeout":
stats["timeout"] += 1
cache[r.key] = [True, r.panic_msg, r.location]
elif r.status == "benign":
stats["benign"] += 1
cache[r.key] = [False, None, None]
else:
stats["no_repro"] += 1
cache[r.key] = [False, None, None]
progress_bar(stats["checked"], total,
stats["reproduced"], stats["no_repro"] + stats["benign"], stats["timeout"])
interrupted = False
try:
if jobs <= 1:
for item in pending:
handle(reproduce_crash(item, timeout))
else:
with ThreadPoolExecutor(max_workers=jobs) as ex:
futures = {ex.submit(reproduce_crash, item, timeout): item for item in pending}
for f in as_completed(futures):
handle(f.result())
except KeyboardInterrupt:
interrupted = True
sys.stderr.write("\n\n*** INTERRUPTED — partial results below ***\n\n")
sys.stderr.write("\r" + " " * 100 + "\r")
sys.stderr.flush()
return interrupted, stats
# ---------------------------------------------------------------------------
# Argument parsing
# ---------------------------------------------------------------------------
def parse_args():
p = argparse.ArgumentParser(description="Triage Solidity fuzzer crashes")
p.add_argument("output_dir", nargs="?", default="out/",
help="AFL output dir (scans all crashes/ subdirs) or single crashes/ dir")
p.add_argument("--binary", default=None,
help="Override harness binary (default: auto-detected per crash)")
p.add_argument("--show-input", action="store_true",
help="Print first 200 chars of crash input")
p.add_argument("--state", default="reports/triage_state.json",
help="Triage state file (default: reports/triage_state.json)")
p.add_argument("-j", "--jobs", type=int, default=1,
help="Parallel reproduction workers (default: 1)")
p.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT,
help=f"Per-crash timeout in seconds (default: {DEFAULT_TIMEOUT})")
return p.parse_args()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
args = parse_args()
output_dir = Path(args.output_dir)
default_binary = Path(args.binary) if args.binary else Path("build/harness_sol")
if not output_dir.exists():
print(f"Output dir not found: {output_dir}", file=sys.stderr)
sys.exit(1)
triage_state = load_triage_state(args.state)
crash_files, crash_dirs = collect_crash_files(output_dir)
if not crash_dirs:
print(f"No crashes/ dirs found under {output_dir}", file=sys.stderr)
sys.exit(1)
if not crash_files:
print("No crash files found.")
sys.exit(0)
cache_path = output_dir / "triage_cache.json"
cache = load_cache(cache_path)
confirmed_dir = output_dir / "confirmed"
confirmed_dir.mkdir(parents=True, exist_ok=True)
file_hashes: dict = {}
pending: list = []
for cf in crash_files:
h = file_hash(cf)
file_hashes[cf] = h
if h not in cache:
binary = harness_from_path(cf, default_binary)
try:
rel = str(cf.relative_to(output_dir))
except ValueError:
rel = str(cf)
pending.append(CrashItem(key=h, path=cf, rel_path=rel, binary=binary))
cached_count = len(crash_files) - len(pending)
print(f"Found {len(crash_files)} crashes from {len(crash_dirs)} instance(s)")
for cd in crash_dirs:
n = sum(1 for f in cd.iterdir() if f.is_file() and f.name.startswith("id:"))
try:
rel = cd.relative_to(output_dir)
except ValueError:
rel = cd
print(f" {rel}: {n} crashes")
if cached_count:
print(f"Cached: {cached_count}, new: {len(pending)}")
if args.jobs > 1:
print(f"Workers: {args.jobs}")
print()
interrupted = False
if pending:
interrupted, _ = run_triage(pending, args.timeout, args.jobs, cache, confirmed_dir)
save_cache(cache_path, cache)
# Collect all results for reporting
bugs = []
filtered = 0
for cf in crash_files:
h = file_hashes[cf]
if h not in cache:
continue
is_bug, panic_msg, location = cache[h][0], cache[h][1], cache[h][2]
if is_bug:
bugs.append((cf, panic_msg or "", location or "unknown"))
else:
filtered += 1
# Dedup by (location, normalised panic)
by_location: dict = defaultdict(list)
for cf, panic_msg, location in bugs:
norm = normalize_bug(panic_msg)
by_location[(location, norm)].append((cf, panic_msg))
new_hashes = {item.key for item in pending}
triaged, known, new = [], [], []
for norm_key, files in sorted(by_location.items(), key=lambda x: -len(x[1])):
location, norm_panic = norm_key
rep_panic = files[0][1]
sk = state_key(location, rep_panic)
entry = triage_state.get(sk) or triage_state.get(state_key(location, norm_panic))
is_new = any(file_hashes[cf] in new_hashes for cf, _ in files)
if entry:
triaged.append((norm_key, files, entry))
elif is_new:
new.append((norm_key, files))
else:
known.append((norm_key, files))
print("=" * 72)
print(
f"RESULTS: {len(bugs)} bugs, {filtered} filtered, "
f"{len(by_location)} unique locations "
f"({len(triaged)} triaged, {len(known)} known, {len(new)} new)"
)
print("=" * 72)
print()
if not bugs:
print("No real bugs found (all crashes filtered).")
if interrupted:
sys.exit(130)
return
def print_bug(idx, location, panic_msg, files, tag=""):
print(f"[{idx}] {location}{tag}")
src = find_source_location(panic_msg)
if src:
print(f" source: {src}")
print(f" panic: {panic_msg}")
print(f" count: {len(files)}")
first_cf = files[0][0]
try:
rel = first_cf.relative_to(output_dir)
except ValueError:
rel = first_cf
print(f" first: {rel}")
if args.show_input:
print(f" input: {first_cf.read_bytes()[:200]!r}")
print()
idx = 1
if triaged:
print("--- TRIAGED " + "-" * 60)
print()
for (location, _), files, entry in triaged:
print_bug(idx, location, files[0][1], files, f" [{entry['status']}]")
idx += 1
if known:
print("--- KNOWN (untriaged) " + "-" * 51)
print()
for (location, _), files in known:
print_bug(idx, location, files[0][1], files)
idx += 1
if new:
print("--- NEW " + "-" * 64)
print()
for (location, _), files in new:
print_bug(idx, location, files[0][1], files)
idx += 1
if interrupted:
sys.exit(130)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment