Created
March 24, 2026 20:53
-
-
Save fryz/93ec8d4898ffe5b5ac5706a208823ef3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Scan all GitHub Actions jobs in the arthur-ai org that ran between | |
| 0800-1244 UTC today and identify any that installed litellm 1.82.7 or 1.82.8. | |
| """ | |
| import io | |
| import os | |
| import re | |
| import sys | |
| import zipfile | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import datetime, timezone | |
| import requests | |
| GITHUB_URL = "https://api.github.com" | |
| ORG = "arthur-ai" | |
| TOKEN = os.environ.get("GITHUB_TOKEN", "") | |
| TODAY = datetime.now(timezone.utc).date() | |
| WINDOW_START = datetime(TODAY.year, TODAY.month, TODAY.day, 8, 0, 0, tzinfo=timezone.utc) | |
| WINDOW_END = datetime(TODAY.year, TODAY.month, TODAY.day, 12, 44, 0, tzinfo=timezone.utc) | |
| TARGET_VERSIONS = {"1.82.7", "1.82.8"} | |
| VERSION_PATTERN = re.compile(r"litellm[=\-](\d+\.\d+\.\d+)", re.IGNORECASE) | |
| SESSION = requests.Session() | |
| SESSION.headers.update({ | |
| "Authorization": f"Bearer {TOKEN}", | |
| "Accept": "application/vnd.github+json", | |
| "X-GitHub-Api-Version": "2022-11-28", | |
| }) | |
| def get_paginated(url, params=None): | |
| params = dict(params or {}) | |
| params.setdefault("per_page", 100) | |
| page = 1 | |
| while True: | |
| params["page"] = page | |
| resp = SESSION.get(url, params=params, timeout=30) | |
| if resp.status_code == 404: | |
| return | |
| resp.raise_for_status() | |
| data = resp.json() | |
| # GitHub wraps some list responses in a top-level key | |
| if isinstance(data, dict): | |
| # find the first list value | |
| items = next((v for v in data.values() if isinstance(v, list)), []) | |
| else: | |
| items = data | |
| if not items: | |
| break | |
| yield from items | |
| if len(items) < params["per_page"]: | |
| break | |
| page += 1 | |
| def parse_ts(ts_str): | |
| if not ts_str: | |
| return None | |
| return datetime.fromisoformat(ts_str.replace("Z", "+00:00")) | |
| def get_repos(): | |
| repos = [] | |
| for r in get_paginated(f"{GITHUB_URL}/orgs/{ORG}/repos", {"type": "all"}): | |
| repos.append({"id": r["id"], "name": r["name"], "full_name": r["full_name"]}) | |
| return repos | |
| def get_runs_in_window(repo_full_name): | |
| """ | |
| Return workflow runs whose created_at falls within the time window. | |
| Uses the `created` query param to narrow the server-side result set. | |
| """ | |
| created_filter = ( | |
| f"{WINDOW_START.strftime('%Y-%m-%dT%H:%M:%SZ')}" | |
| f"..{WINDOW_END.strftime('%Y-%m-%dT%H:%M:%SZ')}" | |
| ) | |
| url = f"{GITHUB_URL}/repos/{repo_full_name}/actions/runs" | |
| runs = [] | |
| for run in get_paginated(url, {"created": created_filter, "per_page": 100}): | |
| ts = parse_ts(run.get("run_started_at") or run.get("created_at")) | |
| if ts and WINDOW_START <= ts <= WINDOW_END: | |
| runs.append(run) | |
| return runs | |
| def get_jobs_for_run(repo_full_name, run_id): | |
| url = f"{GITHUB_URL}/repos/{repo_full_name}/actions/runs/{run_id}/jobs" | |
| jobs = [] | |
| for job in get_paginated(url, {"filter": "all"}): | |
| ts = parse_ts(job.get("started_at")) | |
| if ts and WINDOW_START <= ts <= WINDOW_END: | |
| jobs.append(job) | |
| return jobs | |
| def fetch_job_log(repo_full_name, job_id): | |
| """ | |
| Download job log zip and return concatenated plain text of all step logs. | |
| GitHub returns a 302 redirect to a zip archive. | |
| """ | |
| url = f"{GITHUB_URL}/repos/{repo_full_name}/actions/jobs/{job_id}/logs" | |
| resp = SESSION.get(url, timeout=60, allow_redirects=True) | |
| if resp.status_code in (403, 404, 410): | |
| return "" | |
| resp.raise_for_status() | |
| # Response is either a zip (redirect followed) or plain text | |
| content_type = resp.headers.get("Content-Type", "") | |
| if "zip" in content_type or resp.content[:2] == b"PK": | |
| try: | |
| with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: | |
| parts = [] | |
| for name in sorted(zf.namelist()): | |
| with zf.open(name) as f: | |
| parts.append(f.read().decode("utf-8", errors="replace")) | |
| return "\n".join(parts) | |
| except zipfile.BadZipFile: | |
| pass | |
| return resp.text | |
| def check_job(repo_full_name, job): | |
| job_id = job["id"] | |
| job_name = job["name"] | |
| run_id = job["run_id"] | |
| started = job.get("started_at", "") | |
| log_text = fetch_job_log(repo_full_name, job_id) | |
| if not log_text: | |
| return None | |
| found_versions = set() | |
| context_lines = [] | |
| for line in log_text.splitlines(): | |
| m = VERSION_PATTERN.search(line) | |
| if m: | |
| ver = m.group(1) | |
| if ver in TARGET_VERSIONS: | |
| found_versions.add(ver) | |
| context_lines.append(line.strip()) | |
| if not found_versions: | |
| return None | |
| repo_name = repo_full_name.split("/", 1)[-1] | |
| return { | |
| "repo": repo_full_name, | |
| "run_id": run_id, | |
| "job_id": job_id, | |
| "job_name": job_name, | |
| "started_at": started, | |
| "versions": sorted(found_versions), | |
| "context": context_lines[:10], | |
| "job_url": job.get("html_url", f"https://github.com/{repo_full_name}/actions/runs/{run_id}"), | |
| } | |
| def main(): | |
| if not TOKEN: | |
| print("ERROR: Set GITHUB_TOKEN environment variable.", file=sys.stderr) | |
| sys.exit(1) | |
| print(f"Time window : {WINDOW_START.isoformat()} ‚Üí {WINDOW_END.isoformat()}") | |
| print(f"Hunting for : litellm {', '.join(sorted(TARGET_VERSIONS))}") | |
| print() | |
| print(f"Fetching repositories for org '{ORG}'...") | |
| repos = get_repos() | |
| print(f" Found {len(repos)} repositories") | |
| print() | |
| # Collect all (repo, job) pairs within the window | |
| jobs_to_check = [] # list of (repo_full_name, job_dict) | |
| print("Scanning workflow runs for time window...") | |
| for repo in repos: | |
| full_name = repo["full_name"] | |
| try: | |
| runs = get_runs_in_window(full_name) | |
| except requests.HTTPError as e: | |
| print(f" WARN: {full_name} — {e}", file=sys.stderr) | |
| continue | |
| if not runs: | |
| continue | |
| print(f" {full_name}: {len(runs)} run(s) in window") | |
| for run in runs: | |
| try: | |
| jobs = get_jobs_for_run(full_name, run["id"]) | |
| except requests.HTTPError as e: | |
| print(f" WARN: run {run['id']} — {e}", file=sys.stderr) | |
| continue | |
| for job in jobs: | |
| jobs_to_check.append((full_name, job)) | |
| total = len(jobs_to_check) | |
| print(f"\nFetching logs for {total} job(s)...") | |
| print() | |
| hits = [] | |
| with ThreadPoolExecutor(max_workers=8) as pool: | |
| futures = { | |
| pool.submit(check_job, full_name, job): (full_name, job["id"]) | |
| for full_name, job in jobs_to_check | |
| } | |
| done = 0 | |
| for future in as_completed(futures): | |
| done += 1 | |
| full_name, jid = futures[future] | |
| try: | |
| result = future.result() | |
| except Exception as e: | |
| print(f" ERROR {full_name} job {jid}: {e}", file=sys.stderr) | |
| continue | |
| if result: | |
| hits.append(result) | |
| print( | |
| f" [{done}/{total}] {full_name} job {jid}" + | |
| (f" *** HIT: litellm {result['versions']} ***" if result else ""), | |
| flush=True, | |
| ) | |
| # ── Summary ────────────────────────────────────────────────────────────── | |
| print() | |
| print("=" * 72) | |
| print(f"RESULTS: {len(hits)} job(s) installed litellm {' or '.join(sorted(TARGET_VERSIONS))}") | |
| print("=" * 72) | |
| if not hits: | |
| print("No matches found.") | |
| return | |
| for h in sorted(hits, key=lambda x: x["started_at"]): | |
| print() | |
| print(f" Repo : {h['repo']}") | |
| print(f" Job : {h['job_name']} (#{h['job_id']})") | |
| print(f" Run ID : {h['run_id']}") | |
| print(f" Started : {h['started_at']}") | |
| print(f" Versions : litellm {', '.join(h['versions'])}") | |
| print(f" URL : {h['job_url']}") | |
| print(f" Log lines :") | |
| for line in h["context"]: | |
| print(f" {line}") | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Scan all GitLab CI/CD jobs in the ArthurAI group that ran between 0800-1244 UTC today | |
| and identify any that installed litellm 1.82.7 or 1.82.8. | |
| """ | |
| import os | |
| import re | |
| import sys | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import datetime, timezone | |
| import requests | |
| GITLAB_URL = "https://gitlab.com" | |
| GROUP_NAME = "ArthurAI" | |
| TOKEN = os.environ.get("GITLAB_TOKEN", "") | |
| # Time window (UTC today) | |
| TODAY = datetime.now(timezone.utc).date() | |
| WINDOW_START = datetime(TODAY.year, TODAY.month, TODAY.day, 8, 0, 0, tzinfo=timezone.utc) | |
| WINDOW_END = datetime(TODAY.year, TODAY.month, TODAY.day, 12, 44, 0, tzinfo=timezone.utc) | |
| # Versions to hunt for | |
| TARGET_VERSIONS = {"1.82.7", "1.82.8"} | |
| VERSION_PATTERN = re.compile(r"litellm[=\-](\d+\.\d+\.\d+)", re.IGNORECASE) | |
| HEADERS = {"PRIVATE-TOKEN": TOKEN} | |
| SESSION = requests.Session() | |
| SESSION.headers.update(HEADERS) | |
| def get_paginated(url, params=None): | |
| """Yield all items across paginated GitLab API responses.""" | |
| params = dict(params or {}) | |
| params.setdefault("per_page", 100) | |
| page = 1 | |
| while True: | |
| params["page"] = page | |
| resp = SESSION.get(url, params=params, timeout=30) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if not data: | |
| break | |
| yield from data | |
| if len(data) < params["per_page"]: | |
| break | |
| page += 1 | |
| def get_group_id(group_name): | |
| resp = SESSION.get(f"{GITLAB_URL}/api/v4/groups/{group_name}", timeout=30) | |
| resp.raise_for_status() | |
| return resp.json()["id"] | |
| def get_all_projects(group_id): | |
| """Return all projects (including subgroups) under the given group.""" | |
| projects = [] | |
| for p in get_paginated( | |
| f"{GITLAB_URL}/api/v4/groups/{group_id}/projects", | |
| {"include_subgroups": "true", "archived": "false"}, | |
| ): | |
| projects.append({"id": p["id"], "name": p["path_with_namespace"]}) | |
| return projects | |
| def parse_ts(ts_str): | |
| if not ts_str: | |
| return None | |
| # GitLab returns ISO 8601 with Z or +00:00 | |
| ts_str = ts_str.replace("Z", "+00:00") | |
| return datetime.fromisoformat(ts_str) | |
| def jobs_in_window(project_id): | |
| """ | |
| Return all jobs for the project whose started_at (or finished_at) falls | |
| within the time window. Stop paginating once jobs are older than the window. | |
| """ | |
| matching = [] | |
| url = f"{GITLAB_URL}/api/v4/projects/{project_id}/jobs" | |
| params = {"per_page": 100, "scope[]": ["success", "failed", "canceled", "running"]} | |
| page = 1 | |
| while True: | |
| params["page"] = page | |
| resp = SESSION.get(url, params=params, timeout=30) | |
| if resp.status_code == 403: | |
| return matching # no access to this project's jobs | |
| resp.raise_for_status() | |
| jobs = resp.json() | |
| if not jobs: | |
| break | |
| stop_early = False | |
| for job in jobs: | |
| # Use started_at if available, fall back to created_at | |
| ts = parse_ts(job.get("started_at") or job.get("created_at")) | |
| if ts is None: | |
| continue | |
| if ts > WINDOW_END: | |
| continue # too recent, keep paging | |
| if ts < WINDOW_START: | |
| stop_early = True # everything after this will be older | |
| continue | |
| matching.append(job) | |
| if stop_early or len(jobs) < 100: | |
| break | |
| page += 1 | |
| return matching | |
| def fetch_trace(project_id, job_id): | |
| url = f"{GITLAB_URL}/api/v4/projects/{project_id}/jobs/{job_id}/trace" | |
| resp = SESSION.get(url, timeout=60) | |
| if resp.status_code in (403, 404): | |
| return "" | |
| resp.raise_for_status() | |
| return resp.text | |
| def check_job(project_name, project_id, job): | |
| job_id = job["id"] | |
| job_name = job["name"] | |
| ref = job.get("ref", "") | |
| started = job.get("started_at", job.get("created_at", "")) | |
| trace = fetch_trace(project_id, job_id) | |
| if not trace: | |
| return None | |
| found_versions = set() | |
| for match in VERSION_PATTERN.finditer(trace): | |
| ver = match.group(1) | |
| if ver in TARGET_VERSIONS: | |
| found_versions.add(ver) | |
| if not found_versions: | |
| return None | |
| # Grab a few lines of context around each hit for reporting | |
| context_lines = [] | |
| for line in trace.splitlines(): | |
| if VERSION_PATTERN.search(line): | |
| ver_match = VERSION_PATTERN.search(line) | |
| if ver_match and ver_match.group(1) in TARGET_VERSIONS: | |
| context_lines.append(line.strip()) | |
| return { | |
| "project": project_name, | |
| "project_id": project_id, | |
| "job_id": job_id, | |
| "job_name": job_name, | |
| "ref": ref, | |
| "started_at": started, | |
| "versions": sorted(found_versions), | |
| "context": context_lines[:10], # cap at 10 lines | |
| "job_url": f"{GITLAB_URL}/{project_name}/-/jobs/{job_id}", | |
| } | |
| def main(): | |
| if not TOKEN: | |
| print("ERROR: Set GITLAB_TOKEN environment variable.", file=sys.stderr) | |
| sys.exit(1) | |
| print(f"Time window : {WINDOW_START.isoformat()} ‚Üí {WINDOW_END.isoformat()}") | |
| print(f"Hunting for : litellm {', '.join(sorted(TARGET_VERSIONS))}") | |
| print() | |
| print(f"Resolving group '{GROUP_NAME}'...") | |
| group_id = get_group_id(GROUP_NAME) | |
| print("Fetching projects...") | |
| projects = get_all_projects(group_id) | |
| print(f" Found {len(projects)} projects") | |
| print() | |
| # Collect all jobs in window across all projects | |
| all_jobs_to_check = [] # list of (project_name, project_id, job) | |
| print("Scanning job listings for time window...") | |
| for proj in projects: | |
| try: | |
| jobs = jobs_in_window(proj["id"]) | |
| except requests.HTTPError as e: | |
| print(f" WARN: {proj['name']} — {e}", file=sys.stderr) | |
| continue | |
| if jobs: | |
| print(f" {proj['name']}: {len(jobs)} job(s) in window") | |
| for j in jobs: | |
| all_jobs_to_check.append((proj["name"], proj["id"], j)) | |
| total = len(all_jobs_to_check) | |
| print(f"\nFetching traces for {total} job(s)...") | |
| print() | |
| hits = [] | |
| with ThreadPoolExecutor(max_workers=10) as pool: | |
| futures = { | |
| pool.submit(check_job, pname, pid, job): (pname, job["id"]) | |
| for pname, pid, job in all_jobs_to_check | |
| } | |
| done = 0 | |
| for future in as_completed(futures): | |
| done += 1 | |
| pname, jid = futures[future] | |
| try: | |
| result = future.result() | |
| except Exception as e: | |
| print(f" ERROR checking {pname} job {jid}: {e}", file=sys.stderr) | |
| continue | |
| if result: | |
| hits.append(result) | |
| # Simple progress | |
| print(f" [{done}/{total}] checked {pname} job {jid}" + | |
| (f" *** HIT: litellm {result['versions']} ***" if result else ""), | |
| flush=True) | |
| # ── Summary ────────────────────────────────────────────────────────────── | |
| print() | |
| print("=" * 72) | |
| print(f"RESULTS: {len(hits)} job(s) installed litellm {' or '.join(sorted(TARGET_VERSIONS))}") | |
| print("=" * 72) | |
| if not hits: | |
| print("No matches found.") | |
| return | |
| for h in sorted(hits, key=lambda x: x["started_at"]): | |
| print() | |
| print(f" Project : {h['project']}") | |
| print(f" Job : {h['job_name']} (#{h['job_id']})") | |
| print(f" Branch/tag: {h['ref']}") | |
| print(f" Started : {h['started_at']}") | |
| print(f" Versions : litellm {', '.join(h['versions'])}") | |
| print(f" URL : {h['job_url']}") | |
| print(f" Log lines :") | |
| for line in h["context"]: | |
| print(f" {line}") | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Scan the local Mac filesystem (run with sudo) for installed litellm 1.82.7 / 1.82.8. | |
| Searches every Python environment on the machine: | |
| - System / Homebrew / pyenv / conda / virtualenvs / pipx / user installs | |
| Strategy: find all litellm-*.dist-info directories, read their METADATA | |
| to confirm the exact version, then report the containing environment. | |
| """ | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| from pathlib import Path | |
| TARGET_VERSIONS = {"1.82.7", "1.82.8"} | |
| # Directories to skip — either non-filesystem pseudo-paths or known APFS | |
| # duplicates that would double-count everything under / | |
| PRUNE_PATHS = { | |
| "/System/Volumes/Data", # APFS data volume (same content as /) | |
| "/System/Volumes/VM", | |
| "/System/Volumes/Preboot", | |
| "/System/Volumes/Recovery", | |
| "/System/Volumes/Update", | |
| "/private/var/vm", | |
| "/dev", | |
| "/proc", | |
| "/.Spotlight-V100", | |
| "/.fseventsd", | |
| "/.DocumentRevisions-V100", | |
| } | |
| VERSION_RE = re.compile(r"^Version:\s*(.+)$", re.MULTILINE | re.IGNORECASE) | |
| def find_dist_info_dirs(): | |
| """ | |
| Use the system `find` binary to locate litellm-*.dist-info directories | |
| across the whole filesystem, pruning irrelevant subtrees for speed. | |
| """ | |
| prune_expr = [] | |
| for p in sorted(PRUNE_PATHS): | |
| prune_expr += ["-path", p, "-prune", "-o"] | |
| cmd = ( | |
| ["find", "/"] | |
| + prune_expr | |
| + ["-type", "d", "-name", "litellm-*.dist-info", "-print"] | |
| ) | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| ) | |
| except FileNotFoundError: | |
| print("ERROR: `find` not found.", file=sys.stderr) | |
| sys.exit(1) | |
| paths = [p.strip() for p in result.stdout.splitlines() if p.strip()] | |
| # Log find errors at low verbosity (permission denied on system dirs is expected) | |
| for line in result.stderr.splitlines(): | |
| if "Permission denied" not in line: | |
| print(f" find: {line}", file=sys.stderr) | |
| return paths | |
| def read_metadata_version(dist_info_dir): | |
| """Return the Version: field from the dist-info METADATA file, or None.""" | |
| for fname in ("METADATA", "PKG-INFO"): | |
| meta = Path(dist_info_dir) / fname | |
| if meta.exists(): | |
| try: | |
| text = meta.read_text(errors="replace") | |
| m = VERSION_RE.search(text) | |
| if m: | |
| return m.group(1).strip() | |
| except OSError: | |
| pass | |
| # Fall back to parsing the directory name itself (litellm-1.82.7.dist-info) | |
| name = Path(dist_info_dir).name # e.g. litellm-1.82.7.dist-info | |
| parts = name.replace(".dist-info", "").split("-") | |
| if len(parts) >= 2: | |
| return parts[1] | |
| return None | |
| def classify_environment(dist_info_dir): | |
| """ | |
| Walk up from the dist-info path to guess the Python environment type | |
| and the python executable that owns it. | |
| """ | |
| p = Path(dist_info_dir).resolve() | |
| # Try to find a python/python3 binary relative to the site-packages location | |
| # site-packages is typically <prefix>/lib/pythonX.Y/site-packages/ | |
| site_pkg = p.parent | |
| env_root = None | |
| # Walk upward looking for bin/python or bin/python3 | |
| candidate = site_pkg | |
| for _ in range(6): | |
| candidate = candidate.parent | |
| py = candidate / "bin" / "python3" | |
| if not py.exists(): | |
| py = candidate / "bin" / "python" | |
| if py.exists(): | |
| env_root = candidate | |
| break | |
| env_type = "unknown" | |
| path_str = str(site_pkg) | |
| if "/homebrew/" in path_str.lower() or "/cellar/" in path_str.lower(): | |
| env_type = "homebrew" | |
| elif "/.pyenv/" in path_str: | |
| env_type = "pyenv" | |
| elif "/miniconda" in path_str or "/anaconda" in path_str or "/conda/" in path_str: | |
| env_type = "conda" | |
| elif "/pipx/" in path_str: | |
| env_type = "pipx" | |
| elif "/venv/" in path_str or "/.venv/" in path_str or "/virtualenv" in path_str: | |
| env_type = "virtualenv" | |
| elif "/Library/Python/" in path_str: | |
| env_type = "system-user" if "/Users/" in path_str else "system" | |
| elif path_str.startswith("/usr/local/lib"): | |
| env_type = "usr-local" | |
| elif path_str.startswith("/usr/lib"): | |
| env_type = "system" | |
| elif "/site-packages" in path_str: | |
| env_type = "virtualenv/other" | |
| # Try to detect the owning user from the path | |
| owner = None | |
| if "/Users/" in path_str: | |
| parts = path_str.split("/Users/") | |
| if len(parts) > 1: | |
| owner = parts[1].split("/")[0] | |
| return { | |
| "env_type": env_type, | |
| "env_root": str(env_root) if env_root else str(site_pkg), | |
| "site_pkgs": str(site_pkg), | |
| "owner": owner, | |
| } | |
| def main(): | |
| if os.geteuid() != 0: | |
| print("WARNING: Not running as root — some paths may be inaccessible.") | |
| print(" Re-run with: sudo python3 scan_litellm_local.py\n") | |
| print("Scanning filesystem for litellm dist-info directories...") | |
| print("(This may take 30-90 seconds on a full disk scan)\n") | |
| dist_dirs = find_dist_info_dirs() | |
| print(f"Found {len(dist_dirs)} litellm dist-info director{'y' if len(dist_dirs)==1 else 'ies'} total.\n") | |
| hits = [] | |
| others = [] | |
| for d in dist_dirs: | |
| version = read_metadata_version(d) | |
| env = classify_environment(d) | |
| entry = {"path": d, "version": version, "env": env} | |
| if version in TARGET_VERSIONS: | |
| hits.append(entry) | |
| else: | |
| others.append(entry) | |
| # ── Report ──────────────────────────────────────────────────────────────── | |
| print("=" * 72) | |
| print(f"VULNERABLE VERSIONS FOUND: {len(hits)}") | |
| print(f" (litellm {' / '.join(sorted(TARGET_VERSIONS))})") | |
| print("=" * 72) | |
| if hits: | |
| for h in hits: | |
| env = h["env"] | |
| print() | |
| print(f" Version : litellm {h['version']} *** MATCH ***") | |
| print(f" Env type : {env['env_type']}") | |
| print(f" Owner user : {env['owner'] or '(system)'}") | |
| print(f" Env root : {env['env_root']}") | |
| print(f" Site-pkgs : {env['site_pkgs']}") | |
| print(f" dist-info : {h['path']}") | |
| else: | |
| print("\n No installations of litellm 1.82.7 or 1.82.8 found.") | |
| if others: | |
| print() | |
| print("─" * 72) | |
| print(f"Other litellm versions present on this machine ({len(others)}):") | |
| for o in sorted(others, key=lambda x: (x["version"] or "", x["path"])): | |
| env = o["env"] | |
| owner = f" [{env['owner']}]" if env["owner"] else "" | |
| print(f" litellm {o['version'] or '(unknown)':12s} {env['env_type']:18s}{owner}") | |
| print(f" {o['env']['site_pkgs']}") | |
| print() | |
| print(f"Scan complete. {len(hits)} vulnerable installation(s) found.") | |
| return 1 if hits else 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment