|
#!/usr/bin/env python3 |
|
from __future__ import annotations |
|
|
|
import argparse |
|
import hashlib |
|
import json |
|
import platform |
|
import re |
|
from collections import defaultdict |
|
from dataclasses import dataclass, field |
|
from pathlib import Path |
|
from typing import Iterable |
|
|
|
|
|
MAX_TEXT_READ = 1024 * 1024 |
|
MAX_HISTORY_LINES = 5000 |
|
MAX_SUSPICIOUS_SAMPLES = 20 |
|
MAX_LEVELDB_SCAN_BYTES = 2 * 1024 * 1024 |
|
|
|
IOC_STRINGS = [ |
|
"apifox.it.com", |
|
"/public/apifox-event.js", |
|
"/event/0/log", |
|
"/event/2/log", |
|
"af_uuid", |
|
"af_os", |
|
"af_user", |
|
"af_name", |
|
"af_apifox_user", |
|
"af_apifox_name", |
|
"_rl_headers", |
|
"_rl_mc", |
|
"common.accessToken", |
|
] |
|
|
|
SUSPICIOUS_PATTERNS = [ |
|
("OpenAI key", re.compile(r"\bsk-[A-Za-z0-9_-]{20,}\b")), |
|
("GitHub token", re.compile(r"\bgh[pousr]_[A-Za-z0-9_]{20,}\b")), |
|
("GitLab token", re.compile(r"\bglpat-[A-Za-z0-9_-]{20,}\b")), |
|
("Slack token", re.compile(r"\bxox[baprs]-[A-Za-z0-9-]{10,}\b")), |
|
("AWS access key", re.compile(r"\bAKIA[0-9A-Z]{16}\b")), |
|
("Generic bearer token", re.compile(r"(?i)\bbearer\s+[A-Za-z0-9._=-]{16,}")), |
|
("Password assignment", re.compile(r"(?i)\b(pass(word)?|pwd)\s*[:=]\s*['\"]?[^'\"\s]{6,}")), |
|
("Token assignment", re.compile(r"(?i)\b(token|secret|api[_-]?key)\s*[:=]\s*['\"]?[^'\"\s]{8,}")), |
|
("Database URL", re.compile(r"\b(?:postgres|postgresql|mysql|mongodb|redis|amqp)s?://[^\s'\"]+")), |
|
("Private key block", re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----")), |
|
] |
|
|
|
PATTERN_CHECKLIST_MAPPING = { |
|
"OpenAI key": ("openai_keys", "OpenAI API keys"), |
|
"GitHub token": ("github_tokens", "GitHub tokens"), |
|
"GitLab token": ("gitlab_tokens", "GitLab tokens"), |
|
"Slack token": ("slack_tokens", "Slack tokens"), |
|
"AWS access key": ("aws_access_keys", "AWS access keys"), |
|
"Generic bearer token": ("bearer_tokens", "Bearer / OAuth tokens"), |
|
"Password assignment": ("cli_passwords", "Passwords used in shell commands"), |
|
"Token assignment": ("shell_defined_tokens", "Tokens or secrets assigned in shell files"), |
|
"Database URL": ("database_credentials", "Database credentials in connection strings"), |
|
"Private key block": ("private_key_material", "Private key material"), |
|
} |
|
|
|
|
|
@dataclass |
|
class Finding: |
|
category: str |
|
severity: str |
|
title: str |
|
path: str | None = None |
|
details: dict[str, object] = field(default_factory=dict) |
|
|
|
|
|
def sha256_text(value: str) -> str: |
|
return hashlib.sha256(value.encode("utf-8", errors="ignore")).hexdigest() |
|
|
|
|
|
def safe_read_text(path: Path, limit: int = MAX_TEXT_READ) -> str | None: |
|
try: |
|
if not path.is_file(): |
|
return None |
|
with path.open("rb") as fh: |
|
chunk = fh.read(limit) |
|
return chunk.decode("utf-8", errors="ignore") |
|
except OSError: |
|
return None |
|
|
|
|
|
def safe_listdir(path: Path) -> list[Path]: |
|
try: |
|
return sorted(path.iterdir()) |
|
except OSError: |
|
return [] |
|
|
|
|
|
def redact_line(line: str) -> str: |
|
line = re.sub(r"(token|secret|password|passwd|api[_-]?key)\s*([=:])\s*([^\s\"']+)", r"\1\2<redacted>", line, flags=re.I) |
|
line = re.sub(r"(Authorization:\s*Bearer)\s+[^\s]+", r"\1 <redacted>", line, flags=re.I) |
|
line = re.sub(r"\b(sk-[A-Za-z0-9_-]{6})[A-Za-z0-9_-]+\b", r"\1<redacted>", line) |
|
line = re.sub(r"\b(gh[pousr]_[A-Za-z0-9_]{6})[A-Za-z0-9_]+\b", r"\1<redacted>", line) |
|
line = re.sub(r"\b(glpat-[A-Za-z0-9_-]{6})[A-Za-z0-9_-]+\b", r"\1<redacted>", line) |
|
return line[:300] |
|
|
|
|
|
def add_checklist_item(items: dict[str, dict[str, object]], key: str, service: str, reason: str, paths: Iterable[str]) -> None: |
|
entry = items.setdefault( |
|
key, |
|
{ |
|
"service": service, |
|
"reason": reason, |
|
"paths": set(), |
|
"evidence": set(), |
|
}, |
|
) |
|
entry["paths"].update(p for p in paths if p) |
|
|
|
|
|
def add_checklist_evidence(items: dict[str, dict[str, object]], key: str, evidence: Iterable[str]) -> None: |
|
entry = items.get(key) |
|
if not entry: |
|
return |
|
entry["evidence"].update(e for e in evidence if e) |
|
|
|
|
|
def finalize_checklist(items: dict[str, dict[str, object]]) -> list[dict[str, object]]: |
|
result: list[dict[str, object]] = [] |
|
for key, item in items.items(): |
|
result.append( |
|
{ |
|
"id": key, |
|
"service": item["service"], |
|
"reason": item["reason"], |
|
"paths": sorted(item["paths"]), |
|
"evidence": sorted(item["evidence"]), |
|
} |
|
) |
|
return sorted(result, key=lambda x: x["service"]) |
|
|
|
|
|
def detect_platform() -> str: |
|
system = platform.system().lower() |
|
if "darwin" in system: |
|
return "macos" |
|
if "windows" in system: |
|
return "windows" |
|
return "linux" |
|
|
|
|
|
def candidate_apifox_storage_dirs(home: Path, system_name: str) -> list[Path]: |
|
candidates = [ |
|
home / "Library/Application Support/Apifox", |
|
home / "Library/Application Support/apifox", |
|
home / ".config/Apifox", |
|
home / ".config/apifox", |
|
home / "AppData/Roaming/Apifox", |
|
home / "AppData/Roaming/apifox", |
|
] |
|
if system_name == "macos": |
|
candidates.extend( |
|
[ |
|
home / "Library/Containers/com.apifox.desktop/Data/Library/Application Support/Apifox", |
|
home / "Library/Containers/com.apifox.desktop/Data/Library/Application Support/apifox", |
|
] |
|
) |
|
seen = [] |
|
for path in candidates: |
|
if path not in seen: |
|
seen.append(path) |
|
return seen |
|
|
|
|
|
def scan_apifox_storage(home: Path, checklist: dict[str, dict[str, object]]) -> tuple[list[Finding], dict[str, object]]: |
|
findings: list[Finding] = [] |
|
summary: dict[str, object] = { |
|
"storage_dirs_found": [], |
|
"indicator_hits": [], |
|
} |
|
system_name = detect_platform() |
|
for storage_dir in candidate_apifox_storage_dirs(home, system_name): |
|
if not storage_dir.exists(): |
|
continue |
|
summary["storage_dirs_found"].append(str(storage_dir)) |
|
|
|
for subdir in [storage_dir / "Local Storage/leveldb", storage_dir / "Session Storage", storage_dir]: |
|
if not subdir.exists(): |
|
continue |
|
for file_path in safe_listdir(subdir): |
|
if not file_path.is_file(): |
|
continue |
|
if file_path.suffix.lower() not in {".ldb", ".log", ".json", ".txt"} and "Preferences" not in file_path.name: |
|
continue |
|
text = safe_read_text(file_path, MAX_LEVELDB_SCAN_BYTES) |
|
if not text: |
|
continue |
|
hits = sorted({ioc for ioc in IOC_STRINGS if ioc in text}) |
|
if not hits: |
|
continue |
|
summary["indicator_hits"].append({"path": str(file_path), "hits": hits}) |
|
severity = "high" if any(hit in {"_rl_mc", "_rl_headers", "common.accessToken", "apifox.it.com"} for hit in hits) else "medium" |
|
findings.append( |
|
Finding( |
|
category="apifox_indicator", |
|
severity=severity, |
|
title="Apifox local storage contains IoCs related to the supply-chain attack", |
|
path=str(file_path), |
|
details={"hits": hits}, |
|
) |
|
) |
|
if "_rl_mc" in hits or "_rl_headers" in hits or "common.accessToken" in hits: |
|
add_checklist_item( |
|
checklist, |
|
"apifox_access", |
|
"Apifox account/session", |
|
"发现被攻击代码读取/写入过的本地存储键或 token 痕迹,应视作会话令牌可能暴露。", |
|
[str(file_path)], |
|
) |
|
return findings, summary |
|
|
|
|
|
def classify_ssh_file(path: Path) -> tuple[str, str] | None: |
|
name = path.name.lower() |
|
if name.endswith(".pub"): |
|
return ("ssh_public_key", "SSH public key") |
|
if name in {"known_hosts", "authorized_keys", "config"}: |
|
mapping = { |
|
"known_hosts": ("ssh_known_hosts", "SSH known_hosts"), |
|
"authorized_keys": ("ssh_authorized_keys", "SSH authorized_keys"), |
|
"config": ("ssh_config", "SSH config"), |
|
} |
|
return mapping[name] |
|
text = safe_read_text(path, 65536) or "" |
|
common_private_names = { |
|
"id_rsa", |
|
"id_dsa", |
|
"id_ecdsa", |
|
"id_ed25519", |
|
"id_ecdsa_sk", |
|
"id_ed25519_sk", |
|
"identity", |
|
} |
|
if "PRIVATE KEY" in text or name in common_private_names or path.suffix.lower() in {".pem", ".key", ".p8"}: |
|
return ("ssh_private_key", "SSH private key") |
|
return ("ssh_other_file", "Other file under ~/.ssh") |
|
|
|
|
|
def scan_ssh(home: Path, checklist: dict[str, dict[str, object]]) -> tuple[list[Finding], dict[str, object]]: |
|
findings: list[Finding] = [] |
|
summary: dict[str, object] = {"present": False, "files": []} |
|
ssh_dir = home / ".ssh" |
|
if not ssh_dir.exists(): |
|
return findings, summary |
|
summary["present"] = True |
|
for path in safe_listdir(ssh_dir): |
|
if not path.is_file(): |
|
continue |
|
summary["files"].append(path.name) |
|
classified = classify_ssh_file(path) |
|
if not classified: |
|
continue |
|
item_id, label = classified |
|
severity = "critical" if item_id == "ssh_private_key" else "medium" |
|
findings.append( |
|
Finding( |
|
category="credential_material", |
|
severity=severity, |
|
title=f"{label} present under ~/.ssh", |
|
path=str(path), |
|
) |
|
) |
|
if item_id == "ssh_private_key": |
|
add_checklist_item( |
|
checklist, |
|
"ssh_keys", |
|
"SSH key pairs", |
|
"文章确认攻击载荷会递归读取 ~/.ssh,私钥和相关配置应视作可能已泄露。", |
|
[str(path)], |
|
) |
|
return findings, summary |
|
|
|
|
|
def scan_git_credentials(home: Path, checklist: dict[str, dict[str, object]]) -> tuple[list[Finding], dict[str, object]]: |
|
findings: list[Finding] = [] |
|
summary: dict[str, object] = {"present": False, "hosts": []} |
|
path = home / ".git-credentials" |
|
text = safe_read_text(path) |
|
if not text: |
|
return findings, summary |
|
summary["present"] = True |
|
hosts = sorted(set(re.findall(r"https?://[^/@\s]+@([^/\s]+)", text))) |
|
summary["hosts"] = hosts |
|
findings.append( |
|
Finding( |
|
category="credential_material", |
|
severity="critical", |
|
title="Plaintext Git credentials file exists", |
|
path=str(path), |
|
details={"hosts": hosts}, |
|
) |
|
) |
|
add_checklist_item( |
|
checklist, |
|
"git_tokens", |
|
"Git PAT / app passwords", |
|
"文章确认攻击载荷会读取 ~/.git-credentials,相关 GitHub/GitLab/自建 Git 令牌应全部吊销。", |
|
[str(path)], |
|
) |
|
return findings, summary |
|
|
|
|
|
def scan_npmrc(home: Path, checklist: dict[str, dict[str, object]]) -> tuple[list[Finding], dict[str, object]]: |
|
findings: list[Finding] = [] |
|
summary: dict[str, object] = {"present": False, "token_lines": 0, "registries": []} |
|
path = home / ".npmrc" |
|
text = safe_read_text(path) |
|
if not text: |
|
return findings, summary |
|
summary["present"] = True |
|
token_lines = re.findall(r"(?m)^\s*(?://[^:]+:)?_authToken\s*=", text) |
|
registries = sorted(set(re.findall(r"(?m)^\s*@?[^:\s]*:registry\s*=\s*(\S+)", text))) |
|
summary["token_lines"] = len(token_lines) |
|
summary["registries"] = registries |
|
if token_lines: |
|
findings.append( |
|
Finding( |
|
category="credential_material", |
|
severity="critical", |
|
title="npm auth token configuration found in ~/.npmrc", |
|
path=str(path), |
|
details={"token_lines": len(token_lines), "registries": registries}, |
|
) |
|
) |
|
add_checklist_item( |
|
checklist, |
|
"npm_tokens", |
|
"npm registry tokens", |
|
"文章确认第二阶段载荷会读取 ~/.npmrc,npm 发布/安装凭据应立即轮换。", |
|
[str(path)], |
|
) |
|
return findings, summary |
|
|
|
|
|
def scan_kube(home: Path, checklist: dict[str, dict[str, object]]) -> tuple[list[Finding], dict[str, object]]: |
|
findings: list[Finding] = [] |
|
summary: dict[str, object] = {"present": False, "config_files": [], "clusters": [], "users": []} |
|
kube_dir = home / ".kube" |
|
if not kube_dir.exists(): |
|
return findings, summary |
|
summary["present"] = True |
|
for path in safe_listdir(kube_dir): |
|
if not path.is_file(): |
|
continue |
|
if path.name not in {"config"} and path.suffix.lower() not in {".yaml", ".yml", ".json", ".conf"}: |
|
continue |
|
text = safe_read_text(path) |
|
if not text: |
|
continue |
|
summary["config_files"].append(path.name) |
|
summary["clusters"].extend(re.findall(r"(?m)^\s*-?\s*name:\s*([A-Za-z0-9._:/-]+)", text)) |
|
summary["users"].extend(re.findall(r"(?m)^\s*user:\s*\n(?:.+\n){0,8}?\s*token:\s*(\S+)", text)) |
|
has_token = bool(re.search(r"(?i)\b(token|client-certificate-data|client-key-data|exec:)\b", text)) |
|
if has_token: |
|
findings.append( |
|
Finding( |
|
category="credential_material", |
|
severity="critical", |
|
title="Kubernetes credential-bearing config found", |
|
path=str(path), |
|
) |
|
) |
|
add_checklist_item( |
|
checklist, |
|
"kube_tokens", |
|
"Kubernetes kubeconfig / OIDC refresh tokens / client certs", |
|
"文章确认第二阶段载荷会读取 ~/.kube,集群凭据应按已暴露处理。", |
|
[str(path)], |
|
) |
|
summary["clusters"] = sorted(set(summary["clusters"]))[:20] |
|
summary["users"] = ["<redacted>"] * min(len(summary["users"]), 5) |
|
return findings, summary |
|
|
|
|
|
def scan_subversion(home: Path, checklist: dict[str, dict[str, object]]) -> tuple[list[Finding], dict[str, object]]: |
|
findings: list[Finding] = [] |
|
summary: dict[str, object] = {"present": False, "files": []} |
|
svn_dir = home / ".subversion" |
|
if not svn_dir.exists(): |
|
return findings, summary |
|
summary["present"] = True |
|
auth_dir = svn_dir / "auth" |
|
for path in auth_dir.rglob("*") if auth_dir.exists() else []: |
|
if path.is_file(): |
|
summary["files"].append(str(path.relative_to(svn_dir))) |
|
if summary["files"]: |
|
findings.append( |
|
Finding( |
|
category="credential_material", |
|
severity="high", |
|
title="Subversion auth cache present", |
|
path=str(auth_dir), |
|
details={"files": summary["files"][:20]}, |
|
) |
|
) |
|
add_checklist_item( |
|
checklist, |
|
"svn_credentials", |
|
"SVN credentials", |
|
"文章确认第二阶段载荷会读取 ~/.subversion,SVN 缓存凭据和对应账户密码需要轮换。", |
|
[str(auth_dir)], |
|
) |
|
return findings, summary |
|
|
|
|
|
def inspect_text_file_for_secrets(path: Path, label: str, checklist: dict[str, dict[str, object]]) -> tuple[list[Finding], dict[str, object]]: |
|
findings: list[Finding] = [] |
|
summary: dict[str, object] = {"present": False, "suspicious_samples": [], "pattern_counts": defaultdict(int)} |
|
text = safe_read_text(path) |
|
if text is None: |
|
return findings, summary |
|
summary["present"] = True |
|
lines = text.splitlines() |
|
if "history" in path.name: |
|
lines = lines[-MAX_HISTORY_LINES:] |
|
samples: list[dict[str, str]] = [] |
|
pattern_counts: defaultdict[str, int] = defaultdict(int) |
|
for line in lines: |
|
for pattern_name, regex in SUSPICIOUS_PATTERNS: |
|
if regex.search(line): |
|
pattern_counts[pattern_name] += 1 |
|
if len(samples) < MAX_SUSPICIOUS_SAMPLES: |
|
samples.append({"pattern": pattern_name, "sample": redact_line(line.strip())}) |
|
summary["pattern_counts"] = dict(pattern_counts) |
|
summary["suspicious_samples"] = samples |
|
if samples: |
|
findings.append( |
|
Finding( |
|
category="possible_exposed_secret", |
|
severity="high", |
|
title=f"Suspicious secrets found in {label}", |
|
path=str(path), |
|
details={"pattern_counts": dict(pattern_counts), "samples": samples}, |
|
) |
|
) |
|
for pattern_name in sorted(pattern_counts): |
|
checklist_info = PATTERN_CHECKLIST_MAPPING.get(pattern_name) |
|
if not checklist_info: |
|
continue |
|
item_key, service = checklist_info |
|
add_checklist_item( |
|
checklist, |
|
item_key, |
|
service, |
|
f"{label} 中命中 `{pattern_name}`,如果该文件在 2026-03-04 至 2026-03-22 攻击窗口内存在,对应凭据建议轮换。", |
|
[str(path)], |
|
) |
|
add_checklist_evidence( |
|
checklist, |
|
item_key, |
|
[f"{label}: {pattern_name} x{pattern_counts[pattern_name]}"], |
|
) |
|
return findings, summary |
|
|
|
|
|
def scan_shell_files(home: Path, checklist: dict[str, dict[str, object]]) -> tuple[list[Finding], dict[str, object]]: |
|
findings: list[Finding] = [] |
|
summary: dict[str, object] = {} |
|
for relative in [".zsh_history", ".bash_history", ".zshrc"]: |
|
path = home / relative |
|
part_findings, part_summary = inspect_text_file_for_secrets(path, relative, checklist) |
|
findings.extend(part_findings) |
|
summary[relative] = part_summary |
|
if part_summary.get("present"): |
|
severity = "high" if "history" in relative else "medium" |
|
findings.append( |
|
Finding( |
|
category="credential_material", |
|
severity=severity, |
|
title=f"{relative} present and within documented exfiltration scope", |
|
path=str(path), |
|
) |
|
) |
|
return findings, summary |
|
|
|
|
|
def scan_network_artifacts(home: Path) -> tuple[list[Finding], dict[str, object]]: |
|
findings: list[Finding] = [] |
|
summary: dict[str, object] = {"checked_paths": [], "matches": []} |
|
candidate_logs = [ |
|
home / "Library/Logs", |
|
home / ".config", |
|
home / ".apifox", |
|
] |
|
for base in candidate_logs: |
|
if not base.exists(): |
|
continue |
|
summary["checked_paths"].append(str(base)) |
|
checked = 0 |
|
for path in base.rglob("*"): |
|
if checked > 200: |
|
break |
|
checked += 1 |
|
if not path.is_file(): |
|
continue |
|
if path.suffix.lower() not in {".log", ".json", ".txt", ".ldb"}: |
|
continue |
|
text = safe_read_text(path, 512 * 1024) |
|
if not text: |
|
continue |
|
hits = sorted({ioc for ioc in IOC_STRINGS if ioc in text}) |
|
if hits: |
|
summary["matches"].append({"path": str(path), "hits": hits}) |
|
findings.append( |
|
Finding( |
|
category="network_or_log_ioc", |
|
severity="medium", |
|
title="Local log/config file contains attack-related IoCs", |
|
path=str(path), |
|
details={"hits": hits}, |
|
) |
|
) |
|
return findings, summary |
|
|
|
|
|
def compute_risk_score(findings: list[Finding]) -> str: |
|
severity_weight = {"critical": 4, "high": 3, "medium": 2, "low": 1} |
|
score = sum(severity_weight.get(f.severity, 1) for f in findings) |
|
if score >= 20: |
|
return "critical" |
|
if score >= 10: |
|
return "high" |
|
if score >= 4: |
|
return "medium" |
|
return "low" |
|
|
|
|
|
def render_markdown_report(report: dict[str, object]) -> str: |
|
lines: list[str] = [] |
|
lines.append("# Apifox Supply-Chain Leak Audit") |
|
lines.append("") |
|
lines.append("## Summary") |
|
lines.append("") |
|
lines.append(f"- Host: `{report['host']['hostname']}`") |
|
lines.append(f"- Home: `{report['host']['home']}`") |
|
lines.append(f"- Platform: `{report['host']['platform']}`") |
|
lines.append(f"- Audit risk level: `{report['summary']['risk_level']}`") |
|
lines.append(f"- Findings: `{report['summary']['finding_count']}`") |
|
lines.append(f"- Rotation checklist items: `{len(report['rotation_checklist'])}`") |
|
lines.append("") |
|
lines.append("## Rotation Checklist") |
|
lines.append("") |
|
if report["rotation_checklist"]: |
|
for item in report["rotation_checklist"]: |
|
lines.append(f"- [ ] {item['service']}: {item['reason']}") |
|
if item["paths"]: |
|
lines.append(f" Paths: {', '.join(item['paths'])}") |
|
if item.get("evidence"): |
|
lines.append(f" Evidence: {', '.join(item['evidence'])}") |
|
else: |
|
lines.append("- No obvious credential-bearing files were found in the documented exfiltration scope.") |
|
lines.append("") |
|
lines.append("## Findings") |
|
lines.append("") |
|
if report["findings"]: |
|
for finding in report["findings"]: |
|
path_text = f" (`{finding['path']}`)" if finding.get("path") else "" |
|
lines.append(f"- `{finding['severity']}` {finding['title']}{path_text}") |
|
details = finding.get("details") or {} |
|
if finding["category"] == "possible_exposed_secret": |
|
pattern_counts = details.get("pattern_counts") or {} |
|
samples = details.get("samples") or [] |
|
if pattern_counts: |
|
for pattern_name, count in pattern_counts.items(): |
|
lines.append(f" - Pattern `{pattern_name}`: `{count}` hit(s)") |
|
if samples: |
|
for sample in samples: |
|
lines.append(f" - Sample `{sample['pattern']}`: `{sample['sample']}`") |
|
elif details: |
|
lines.append(f" Details: `{json.dumps(details, ensure_ascii=False)}`") |
|
else: |
|
lines.append("- No local IoCs or credential-bearing files were detected in the default scan paths.") |
|
lines.append("") |
|
lines.append("## Notes") |
|
lines.append("") |
|
lines.append("- This script does not print raw secret values; samples are redacted.") |
|
lines.append("- Absence of findings does not prove the host is safe. The article describes remote code execution capability, so a clean report is not a guarantee.") |
|
return "\n".join(lines) + "\n" |
|
|
|
|
|
def build_report(home: Path) -> dict[str, object]: |
|
checklist: dict[str, dict[str, object]] = {} |
|
findings: list[Finding] = [] |
|
scan_summaries: dict[str, object] = {} |
|
|
|
for name, scanner in [ |
|
("apifox_storage", scan_apifox_storage), |
|
("ssh", scan_ssh), |
|
("git_credentials", scan_git_credentials), |
|
("npmrc", scan_npmrc), |
|
("kube", scan_kube), |
|
("subversion", scan_subversion), |
|
("shell_files", scan_shell_files), |
|
]: |
|
scanner_findings, summary = scanner(home, checklist) |
|
findings.extend(scanner_findings) |
|
scan_summaries[name] = summary |
|
|
|
network_findings, network_summary = scan_network_artifacts(home) |
|
findings.extend(network_findings) |
|
scan_summaries["network_artifacts"] = network_summary |
|
|
|
findings = sorted(findings, key=lambda f: ({"critical": 0, "high": 1, "medium": 2, "low": 3}.get(f.severity, 4), f.title)) |
|
report = { |
|
"host": { |
|
"hostname": platform.node(), |
|
"home": str(home), |
|
"platform": platform.platform(), |
|
"audit_fingerprint": sha256_text(f"{platform.node()}|{home}|{platform.platform()}"), |
|
}, |
|
"summary": { |
|
"risk_level": compute_risk_score(findings), |
|
"finding_count": len(findings), |
|
}, |
|
"findings": [ |
|
{ |
|
"category": finding.category, |
|
"severity": finding.severity, |
|
"title": finding.title, |
|
"path": finding.path, |
|
"details": finding.details, |
|
} |
|
for finding in findings |
|
], |
|
"rotation_checklist": finalize_checklist(checklist), |
|
"scan_summaries": scan_summaries, |
|
"source_context": { |
|
"reference_article": "https://rce.moe/2026/03/25/apifox-supply-chain-attack-analysis/", |
|
"documented_attack_window": { |
|
"start": "2026-03-04", |
|
"end": "2026-03-22", |
|
}, |
|
"documented_targets": [ |
|
"~/.ssh/*", |
|
"~/.git-credentials", |
|
"~/.zsh_history", |
|
"~/.bash_history", |
|
"~/.zshrc", |
|
"~/.npmrc", |
|
"~/.kube/*", |
|
"~/.subversion/*", |
|
"Apifox Electron localStorage", |
|
], |
|
}, |
|
} |
|
return report |
|
|
|
|
|
def main() -> int: |
|
parser = argparse.ArgumentParser(description="Audit local machine for possible credential exposure related to the Apifox supply-chain attack.") |
|
parser.add_argument("--home", type=Path, default=Path.home(), help="Home directory to inspect. Defaults to the current user's home.") |
|
parser.add_argument("--json-out", type=Path, default=Path("apifox_leak_audit_report.json"), help="Path to write the JSON report.") |
|
parser.add_argument("--md-out", type=Path, default=Path("apifox_leak_audit_report.md"), help="Path to write the Markdown checklist/report.") |
|
args = parser.parse_args() |
|
|
|
home = args.home.expanduser().resolve() |
|
report = build_report(home) |
|
|
|
args.json_out.write_text(json.dumps(report, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") |
|
args.md_out.write_text(render_markdown_report(report), encoding="utf-8") |
|
|
|
print(f"Wrote JSON report to {args.json_out}") |
|
print(f"Wrote Markdown report to {args.md_out}") |
|
print(f"Risk level: {report['summary']['risk_level']}") |
|
print(f"Findings: {report['summary']['finding_count']}") |
|
print(f"Checklist items: {len(report['rotation_checklist'])}") |
|
return 0 |
|
|
|
|
|
if __name__ == "__main__": |
|
raise SystemExit(main()) |