Skip to content

Instantly share code, notes, and snippets.

@fryz
Created March 24, 2026 20:53
Show Gist options
  • Select an option

  • Save fryz/93ec8d4898ffe5b5ac5706a208823ef3 to your computer and use it in GitHub Desktop.

Select an option

Save fryz/93ec8d4898ffe5b5ac5706a208823ef3 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Scan all GitHub Actions jobs in the arthur-ai org that ran between
0800-1244 UTC today and identify any that installed litellm 1.82.7 or 1.82.8.
"""
import io
import os
import re
import sys
import zipfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
import requests
GITHUB_URL = "https://api.github.com"
ORG = "arthur-ai"
TOKEN = os.environ.get("GITHUB_TOKEN", "")
TODAY = datetime.now(timezone.utc).date()
WINDOW_START = datetime(TODAY.year, TODAY.month, TODAY.day, 8, 0, 0, tzinfo=timezone.utc)
WINDOW_END = datetime(TODAY.year, TODAY.month, TODAY.day, 12, 44, 0, tzinfo=timezone.utc)
TARGET_VERSIONS = {"1.82.7", "1.82.8"}
VERSION_PATTERN = re.compile(r"litellm[=\-](\d+\.\d+\.\d+)", re.IGNORECASE)
SESSION = requests.Session()
SESSION.headers.update({
"Authorization": f"Bearer {TOKEN}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
})
def get_paginated(url, params=None):
params = dict(params or {})
params.setdefault("per_page", 100)
page = 1
while True:
params["page"] = page
resp = SESSION.get(url, params=params, timeout=30)
if resp.status_code == 404:
return
resp.raise_for_status()
data = resp.json()
# GitHub wraps some list responses in a top-level key
if isinstance(data, dict):
# find the first list value
items = next((v for v in data.values() if isinstance(v, list)), [])
else:
items = data
if not items:
break
yield from items
if len(items) < params["per_page"]:
break
page += 1
def parse_ts(ts_str):
if not ts_str:
return None
return datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
def get_repos():
repos = []
for r in get_paginated(f"{GITHUB_URL}/orgs/{ORG}/repos", {"type": "all"}):
repos.append({"id": r["id"], "name": r["name"], "full_name": r["full_name"]})
return repos
def get_runs_in_window(repo_full_name):
"""
Return workflow runs whose created_at falls within the time window.
Uses the `created` query param to narrow the server-side result set.
"""
created_filter = (
f"{WINDOW_START.strftime('%Y-%m-%dT%H:%M:%SZ')}"
f"..{WINDOW_END.strftime('%Y-%m-%dT%H:%M:%SZ')}"
)
url = f"{GITHUB_URL}/repos/{repo_full_name}/actions/runs"
runs = []
for run in get_paginated(url, {"created": created_filter, "per_page": 100}):
ts = parse_ts(run.get("run_started_at") or run.get("created_at"))
if ts and WINDOW_START <= ts <= WINDOW_END:
runs.append(run)
return runs
def get_jobs_for_run(repo_full_name, run_id):
url = f"{GITHUB_URL}/repos/{repo_full_name}/actions/runs/{run_id}/jobs"
jobs = []
for job in get_paginated(url, {"filter": "all"}):
ts = parse_ts(job.get("started_at"))
if ts and WINDOW_START <= ts <= WINDOW_END:
jobs.append(job)
return jobs
def fetch_job_log(repo_full_name, job_id):
"""
Download job log zip and return concatenated plain text of all step logs.
GitHub returns a 302 redirect to a zip archive.
"""
url = f"{GITHUB_URL}/repos/{repo_full_name}/actions/jobs/{job_id}/logs"
resp = SESSION.get(url, timeout=60, allow_redirects=True)
if resp.status_code in (403, 404, 410):
return ""
resp.raise_for_status()
# Response is either a zip (redirect followed) or plain text
content_type = resp.headers.get("Content-Type", "")
if "zip" in content_type or resp.content[:2] == b"PK":
try:
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
parts = []
for name in sorted(zf.namelist()):
with zf.open(name) as f:
parts.append(f.read().decode("utf-8", errors="replace"))
return "\n".join(parts)
except zipfile.BadZipFile:
pass
return resp.text
def check_job(repo_full_name, job):
job_id = job["id"]
job_name = job["name"]
run_id = job["run_id"]
started = job.get("started_at", "")
log_text = fetch_job_log(repo_full_name, job_id)
if not log_text:
return None
found_versions = set()
context_lines = []
for line in log_text.splitlines():
m = VERSION_PATTERN.search(line)
if m:
ver = m.group(1)
if ver in TARGET_VERSIONS:
found_versions.add(ver)
context_lines.append(line.strip())
if not found_versions:
return None
repo_name = repo_full_name.split("/", 1)[-1]
return {
"repo": repo_full_name,
"run_id": run_id,
"job_id": job_id,
"job_name": job_name,
"started_at": started,
"versions": sorted(found_versions),
"context": context_lines[:10],
"job_url": job.get("html_url", f"https://github.com/{repo_full_name}/actions/runs/{run_id}"),
}
def main():
if not TOKEN:
print("ERROR: Set GITHUB_TOKEN environment variable.", file=sys.stderr)
sys.exit(1)
print(f"Time window : {WINDOW_START.isoformat()} ‚Üí {WINDOW_END.isoformat()}")
print(f"Hunting for : litellm {', '.join(sorted(TARGET_VERSIONS))}")
print()
print(f"Fetching repositories for org '{ORG}'...")
repos = get_repos()
print(f" Found {len(repos)} repositories")
print()
# Collect all (repo, job) pairs within the window
jobs_to_check = [] # list of (repo_full_name, job_dict)
print("Scanning workflow runs for time window...")
for repo in repos:
full_name = repo["full_name"]
try:
runs = get_runs_in_window(full_name)
except requests.HTTPError as e:
print(f" WARN: {full_name} — {e}", file=sys.stderr)
continue
if not runs:
continue
print(f" {full_name}: {len(runs)} run(s) in window")
for run in runs:
try:
jobs = get_jobs_for_run(full_name, run["id"])
except requests.HTTPError as e:
print(f" WARN: run {run['id']} — {e}", file=sys.stderr)
continue
for job in jobs:
jobs_to_check.append((full_name, job))
total = len(jobs_to_check)
print(f"\nFetching logs for {total} job(s)...")
print()
hits = []
with ThreadPoolExecutor(max_workers=8) as pool:
futures = {
pool.submit(check_job, full_name, job): (full_name, job["id"])
for full_name, job in jobs_to_check
}
done = 0
for future in as_completed(futures):
done += 1
full_name, jid = futures[future]
try:
result = future.result()
except Exception as e:
print(f" ERROR {full_name} job {jid}: {e}", file=sys.stderr)
continue
if result:
hits.append(result)
print(
f" [{done}/{total}] {full_name} job {jid}" +
(f" *** HIT: litellm {result['versions']} ***" if result else ""),
flush=True,
)
# ── Summary ──────────────────────────────────────────────────────────────
print()
print("=" * 72)
print(f"RESULTS: {len(hits)} job(s) installed litellm {' or '.join(sorted(TARGET_VERSIONS))}")
print("=" * 72)
if not hits:
print("No matches found.")
return
for h in sorted(hits, key=lambda x: x["started_at"]):
print()
print(f" Repo : {h['repo']}")
print(f" Job : {h['job_name']} (#{h['job_id']})")
print(f" Run ID : {h['run_id']}")
print(f" Started : {h['started_at']}")
print(f" Versions : litellm {', '.join(h['versions'])}")
print(f" URL : {h['job_url']}")
print(f" Log lines :")
for line in h["context"]:
print(f" {line}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Scan all GitLab CI/CD jobs in the ArthurAI group that ran between 0800-1244 UTC today
and identify any that installed litellm 1.82.7 or 1.82.8.
"""
import os
import re
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
import requests
GITLAB_URL = "https://gitlab.com"
GROUP_NAME = "ArthurAI"
TOKEN = os.environ.get("GITLAB_TOKEN", "")
# Time window (UTC today)
TODAY = datetime.now(timezone.utc).date()
WINDOW_START = datetime(TODAY.year, TODAY.month, TODAY.day, 8, 0, 0, tzinfo=timezone.utc)
WINDOW_END = datetime(TODAY.year, TODAY.month, TODAY.day, 12, 44, 0, tzinfo=timezone.utc)
# Versions to hunt for
TARGET_VERSIONS = {"1.82.7", "1.82.8"}
VERSION_PATTERN = re.compile(r"litellm[=\-](\d+\.\d+\.\d+)", re.IGNORECASE)
HEADERS = {"PRIVATE-TOKEN": TOKEN}
SESSION = requests.Session()
SESSION.headers.update(HEADERS)
def get_paginated(url, params=None):
"""Yield all items across paginated GitLab API responses."""
params = dict(params or {})
params.setdefault("per_page", 100)
page = 1
while True:
params["page"] = page
resp = SESSION.get(url, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
if not data:
break
yield from data
if len(data) < params["per_page"]:
break
page += 1
def get_group_id(group_name):
resp = SESSION.get(f"{GITLAB_URL}/api/v4/groups/{group_name}", timeout=30)
resp.raise_for_status()
return resp.json()["id"]
def get_all_projects(group_id):
"""Return all projects (including subgroups) under the given group."""
projects = []
for p in get_paginated(
f"{GITLAB_URL}/api/v4/groups/{group_id}/projects",
{"include_subgroups": "true", "archived": "false"},
):
projects.append({"id": p["id"], "name": p["path_with_namespace"]})
return projects
def parse_ts(ts_str):
if not ts_str:
return None
# GitLab returns ISO 8601 with Z or +00:00
ts_str = ts_str.replace("Z", "+00:00")
return datetime.fromisoformat(ts_str)
def jobs_in_window(project_id):
"""
Return all jobs for the project whose started_at (or finished_at) falls
within the time window. Stop paginating once jobs are older than the window.
"""
matching = []
url = f"{GITLAB_URL}/api/v4/projects/{project_id}/jobs"
params = {"per_page": 100, "scope[]": ["success", "failed", "canceled", "running"]}
page = 1
while True:
params["page"] = page
resp = SESSION.get(url, params=params, timeout=30)
if resp.status_code == 403:
return matching # no access to this project's jobs
resp.raise_for_status()
jobs = resp.json()
if not jobs:
break
stop_early = False
for job in jobs:
# Use started_at if available, fall back to created_at
ts = parse_ts(job.get("started_at") or job.get("created_at"))
if ts is None:
continue
if ts > WINDOW_END:
continue # too recent, keep paging
if ts < WINDOW_START:
stop_early = True # everything after this will be older
continue
matching.append(job)
if stop_early or len(jobs) < 100:
break
page += 1
return matching
def fetch_trace(project_id, job_id):
url = f"{GITLAB_URL}/api/v4/projects/{project_id}/jobs/{job_id}/trace"
resp = SESSION.get(url, timeout=60)
if resp.status_code in (403, 404):
return ""
resp.raise_for_status()
return resp.text
def check_job(project_name, project_id, job):
job_id = job["id"]
job_name = job["name"]
ref = job.get("ref", "")
started = job.get("started_at", job.get("created_at", ""))
trace = fetch_trace(project_id, job_id)
if not trace:
return None
found_versions = set()
for match in VERSION_PATTERN.finditer(trace):
ver = match.group(1)
if ver in TARGET_VERSIONS:
found_versions.add(ver)
if not found_versions:
return None
# Grab a few lines of context around each hit for reporting
context_lines = []
for line in trace.splitlines():
if VERSION_PATTERN.search(line):
ver_match = VERSION_PATTERN.search(line)
if ver_match and ver_match.group(1) in TARGET_VERSIONS:
context_lines.append(line.strip())
return {
"project": project_name,
"project_id": project_id,
"job_id": job_id,
"job_name": job_name,
"ref": ref,
"started_at": started,
"versions": sorted(found_versions),
"context": context_lines[:10], # cap at 10 lines
"job_url": f"{GITLAB_URL}/{project_name}/-/jobs/{job_id}",
}
def main():
if not TOKEN:
print("ERROR: Set GITLAB_TOKEN environment variable.", file=sys.stderr)
sys.exit(1)
print(f"Time window : {WINDOW_START.isoformat()} ‚Üí {WINDOW_END.isoformat()}")
print(f"Hunting for : litellm {', '.join(sorted(TARGET_VERSIONS))}")
print()
print(f"Resolving group '{GROUP_NAME}'...")
group_id = get_group_id(GROUP_NAME)
print("Fetching projects...")
projects = get_all_projects(group_id)
print(f" Found {len(projects)} projects")
print()
# Collect all jobs in window across all projects
all_jobs_to_check = [] # list of (project_name, project_id, job)
print("Scanning job listings for time window...")
for proj in projects:
try:
jobs = jobs_in_window(proj["id"])
except requests.HTTPError as e:
print(f" WARN: {proj['name']} — {e}", file=sys.stderr)
continue
if jobs:
print(f" {proj['name']}: {len(jobs)} job(s) in window")
for j in jobs:
all_jobs_to_check.append((proj["name"], proj["id"], j))
total = len(all_jobs_to_check)
print(f"\nFetching traces for {total} job(s)...")
print()
hits = []
with ThreadPoolExecutor(max_workers=10) as pool:
futures = {
pool.submit(check_job, pname, pid, job): (pname, job["id"])
for pname, pid, job in all_jobs_to_check
}
done = 0
for future in as_completed(futures):
done += 1
pname, jid = futures[future]
try:
result = future.result()
except Exception as e:
print(f" ERROR checking {pname} job {jid}: {e}", file=sys.stderr)
continue
if result:
hits.append(result)
# Simple progress
print(f" [{done}/{total}] checked {pname} job {jid}" +
(f" *** HIT: litellm {result['versions']} ***" if result else ""),
flush=True)
# ── Summary ──────────────────────────────────────────────────────────────
print()
print("=" * 72)
print(f"RESULTS: {len(hits)} job(s) installed litellm {' or '.join(sorted(TARGET_VERSIONS))}")
print("=" * 72)
if not hits:
print("No matches found.")
return
for h in sorted(hits, key=lambda x: x["started_at"]):
print()
print(f" Project : {h['project']}")
print(f" Job : {h['job_name']} (#{h['job_id']})")
print(f" Branch/tag: {h['ref']}")
print(f" Started : {h['started_at']}")
print(f" Versions : litellm {', '.join(h['versions'])}")
print(f" URL : {h['job_url']}")
print(f" Log lines :")
for line in h["context"]:
print(f" {line}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Scan the local Mac filesystem (run with sudo) for installed litellm 1.82.7 / 1.82.8.
Searches every Python environment on the machine:
- System / Homebrew / pyenv / conda / virtualenvs / pipx / user installs
Strategy: find all litellm-*.dist-info directories, read their METADATA
to confirm the exact version, then report the containing environment.
"""
import os
import re
import subprocess
import sys
from pathlib import Path
TARGET_VERSIONS = {"1.82.7", "1.82.8"}
# Directories to skip — either non-filesystem pseudo-paths or known APFS
# duplicates that would double-count everything under /
PRUNE_PATHS = {
"/System/Volumes/Data", # APFS data volume (same content as /)
"/System/Volumes/VM",
"/System/Volumes/Preboot",
"/System/Volumes/Recovery",
"/System/Volumes/Update",
"/private/var/vm",
"/dev",
"/proc",
"/.Spotlight-V100",
"/.fseventsd",
"/.DocumentRevisions-V100",
}
VERSION_RE = re.compile(r"^Version:\s*(.+)$", re.MULTILINE | re.IGNORECASE)
def find_dist_info_dirs():
"""
Use the system `find` binary to locate litellm-*.dist-info directories
across the whole filesystem, pruning irrelevant subtrees for speed.
"""
prune_expr = []
for p in sorted(PRUNE_PATHS):
prune_expr += ["-path", p, "-prune", "-o"]
cmd = (
["find", "/"]
+ prune_expr
+ ["-type", "d", "-name", "litellm-*.dist-info", "-print"]
)
try:
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
except FileNotFoundError:
print("ERROR: `find` not found.", file=sys.stderr)
sys.exit(1)
paths = [p.strip() for p in result.stdout.splitlines() if p.strip()]
# Log find errors at low verbosity (permission denied on system dirs is expected)
for line in result.stderr.splitlines():
if "Permission denied" not in line:
print(f" find: {line}", file=sys.stderr)
return paths
def read_metadata_version(dist_info_dir):
"""Return the Version: field from the dist-info METADATA file, or None."""
for fname in ("METADATA", "PKG-INFO"):
meta = Path(dist_info_dir) / fname
if meta.exists():
try:
text = meta.read_text(errors="replace")
m = VERSION_RE.search(text)
if m:
return m.group(1).strip()
except OSError:
pass
# Fall back to parsing the directory name itself (litellm-1.82.7.dist-info)
name = Path(dist_info_dir).name # e.g. litellm-1.82.7.dist-info
parts = name.replace(".dist-info", "").split("-")
if len(parts) >= 2:
return parts[1]
return None
def classify_environment(dist_info_dir):
"""
Walk up from the dist-info path to guess the Python environment type
and the python executable that owns it.
"""
p = Path(dist_info_dir).resolve()
# Try to find a python/python3 binary relative to the site-packages location
# site-packages is typically <prefix>/lib/pythonX.Y/site-packages/
site_pkg = p.parent
env_root = None
# Walk upward looking for bin/python or bin/python3
candidate = site_pkg
for _ in range(6):
candidate = candidate.parent
py = candidate / "bin" / "python3"
if not py.exists():
py = candidate / "bin" / "python"
if py.exists():
env_root = candidate
break
env_type = "unknown"
path_str = str(site_pkg)
if "/homebrew/" in path_str.lower() or "/cellar/" in path_str.lower():
env_type = "homebrew"
elif "/.pyenv/" in path_str:
env_type = "pyenv"
elif "/miniconda" in path_str or "/anaconda" in path_str or "/conda/" in path_str:
env_type = "conda"
elif "/pipx/" in path_str:
env_type = "pipx"
elif "/venv/" in path_str or "/.venv/" in path_str or "/virtualenv" in path_str:
env_type = "virtualenv"
elif "/Library/Python/" in path_str:
env_type = "system-user" if "/Users/" in path_str else "system"
elif path_str.startswith("/usr/local/lib"):
env_type = "usr-local"
elif path_str.startswith("/usr/lib"):
env_type = "system"
elif "/site-packages" in path_str:
env_type = "virtualenv/other"
# Try to detect the owning user from the path
owner = None
if "/Users/" in path_str:
parts = path_str.split("/Users/")
if len(parts) > 1:
owner = parts[1].split("/")[0]
return {
"env_type": env_type,
"env_root": str(env_root) if env_root else str(site_pkg),
"site_pkgs": str(site_pkg),
"owner": owner,
}
def main():
if os.geteuid() != 0:
print("WARNING: Not running as root — some paths may be inaccessible.")
print(" Re-run with: sudo python3 scan_litellm_local.py\n")
print("Scanning filesystem for litellm dist-info directories...")
print("(This may take 30-90 seconds on a full disk scan)\n")
dist_dirs = find_dist_info_dirs()
print(f"Found {len(dist_dirs)} litellm dist-info director{'y' if len(dist_dirs)==1 else 'ies'} total.\n")
hits = []
others = []
for d in dist_dirs:
version = read_metadata_version(d)
env = classify_environment(d)
entry = {"path": d, "version": version, "env": env}
if version in TARGET_VERSIONS:
hits.append(entry)
else:
others.append(entry)
# ── Report ────────────────────────────────────────────────────────────────
print("=" * 72)
print(f"VULNERABLE VERSIONS FOUND: {len(hits)}")
print(f" (litellm {' / '.join(sorted(TARGET_VERSIONS))})")
print("=" * 72)
if hits:
for h in hits:
env = h["env"]
print()
print(f" Version : litellm {h['version']} *** MATCH ***")
print(f" Env type : {env['env_type']}")
print(f" Owner user : {env['owner'] or '(system)'}")
print(f" Env root : {env['env_root']}")
print(f" Site-pkgs : {env['site_pkgs']}")
print(f" dist-info : {h['path']}")
else:
print("\n No installations of litellm 1.82.7 or 1.82.8 found.")
if others:
print()
print("─" * 72)
print(f"Other litellm versions present on this machine ({len(others)}):")
for o in sorted(others, key=lambda x: (x["version"] or "", x["path"])):
env = o["env"]
owner = f" [{env['owner']}]" if env["owner"] else ""
print(f" litellm {o['version'] or '(unknown)':12s} {env['env_type']:18s}{owner}")
print(f" {o['env']['site_pkgs']}")
print()
print(f"Scan complete. {len(hits)} vulnerable installation(s) found.")
return 1 if hits else 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment