Last active
December 22, 2022 20:12
-
-
Save dryan/70060446d2e9aad0b366de4bba7872e3 to your computer and use it in GitHub Desktop.
Update and audit GitHub action workflow.yml files for outdated versions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-i https://pypi.org/simple | |
anyio==3.6.2; python_full_version >= '3.6.2' | |
certifi==2022.12.7; python_version >= '3.6' | |
click==8.1.3; python_version >= '3.7' | |
colorama==0.4.6 | |
commonmark==0.9.1 | |
h11==0.14.0; python_version >= '3.7' | |
httpcore==0.16.3; python_version >= '3.7' | |
httpx==0.23.1 | |
idna==3.4 | |
pygments==2.13.0; python_version >= '3.6' | |
pyyaml==6.0; python_version >= '3.6' | |
rfc3986[idna2008]==1.5.0 | |
rich==12.6.0 | |
saneyaml==0.5.2 | |
shellingham==1.5.0 | |
sniffio==1.3.0; python_version >= '3.7' | |
typer[all]==0.7.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
import datetime | |
import json | |
import pathlib | |
import typing | |
import httpx | |
import saneyaml | |
import typer | |
from rich import print | |
BASE_PROJECTS_PATH = "~/Sites" | |
app = typer.Typer() | |
@app.command("action_version") | |
def get_latest_version_of_action(action: str, echo: bool = True) -> str: | |
if "/" not in action: | |
action = f"actions/{action}" | |
ACTION_CACHE = {} | |
CACHE_PATH = pathlib.Path(__file__).parent / ".cache/action_versions.json" | |
# if the cache is older than 24 hours, delete it | |
if ( | |
CACHE_PATH.exists() | |
and CACHE_PATH.stat().st_mtime < datetime.datetime.now().timestamp() - 86400 | |
): | |
CACHE_PATH.unlink() | |
if CACHE_PATH.exists(): | |
ACTION_CACHE = json.loads(CACHE_PATH.read_text()) | |
if action in ACTION_CACHE: | |
tag = ACTION_CACHE[action] | |
else: | |
url = f"https://api.github.com/repos/{action}/releases/latest" | |
response = httpx.get(url) | |
if response.status_code == 404: | |
# this repo doesn't use releases, so use the default branch | |
url = f"https://api.github.com/repos/{action}" | |
repo_response = httpx.get(url) | |
repo_response.raise_for_status() | |
tag = repo_response.json()["default_branch"] | |
else: | |
response.raise_for_status() | |
tag = response.json()["tag_name"].split(".")[0] | |
ACTION_CACHE[action] = tag | |
if echo: | |
print(tag) | |
CACHE_PATH.write_text(json.dumps(ACTION_CACHE, indent=2)) | |
return tag | |
def update_yaml_file( | |
path: str, | |
) -> bool: | |
path_obj = pathlib.Path(path) | |
if not path_obj.exists(): | |
raise FileNotFoundError(path) | |
workflow = saneyaml.load(path_obj.read_text()) | |
workflow = { | |
"on" if key is True else key: workflow.pop(key) for key in list(workflow.keys()) | |
} | |
initial_versions = {} | |
updated_versions = {} | |
for job in workflow["jobs"].values(): | |
for step in job["steps"]: | |
if "uses" in step: | |
action, current_version = step["uses"].split("@") | |
initial_versions[action] = current_version | |
if current_version in ["main", "master"]: | |
updated_versions[action] = current_version | |
continue | |
latest_version = get_latest_version_of_action(action, echo=False) | |
updated_versions[action] = latest_version | |
step["uses"] = f"{action}@{latest_version}" | |
updated_workflow = saneyaml.dump(workflow).replace(": 'null'", ":") | |
# if any initial versions are different from the updated versions, write the file | |
if ": 'null'" in path_obj.read_text() or any( | |
[initial_versions[k] != updated_versions[k] for k in initial_versions.keys()] | |
): | |
path_obj.write_text(updated_workflow) | |
return True | |
return False | |
@app.command("update_workflow") | |
def update_workflow( | |
path: str, | |
) -> None: | |
if update_yaml_file(path): | |
print(f"Updated {path}") | |
else: | |
print(f"No updates for {path}") | |
@app.command("update_all_workflows") | |
def update_all_yaml_files(base_path: str = None) -> None: | |
if base_path is None: | |
base_path = BASE_PROJECTS_PATH | |
workflows = [ | |
x | |
for x in pathlib.Path(base_path) | |
.expanduser() | |
.glob("**/.github/workflows/*[.yml]") | |
if "node_modules" not in x.parts | |
] | |
updated = 0 | |
for path in workflows: | |
if update_yaml_file(path): | |
print(f"Updated {path}") | |
updated += 1 | |
print(f"Updated {updated} workflow{'' if updated == 1 else 's'}.") | |
class Uses(typing.TypedDict): | |
action: str | |
current_version: str | |
latest_version: str | |
needs_update: bool | |
@app.command("audit_workflow") | |
def audit_workflow( | |
path: str, | |
echo: bool = True, | |
) -> typing.Iterator[Uses]: | |
path_obj = pathlib.Path(path) | |
if not path_obj.exists(): | |
raise FileNotFoundError(path) | |
workflow = saneyaml.load(path_obj.read_text()) | |
actions = [] | |
for job in workflow["jobs"].values(): | |
for step in job["steps"]: | |
if "uses" in step: | |
action, current_version = step["uses"].split("@") | |
latest_version = get_latest_version_of_action(action, echo=False) | |
actions.append( | |
Uses( | |
action=action, | |
current_version=current_version, | |
latest_version=latest_version, | |
needs_update=current_version < latest_version, | |
) | |
) | |
if echo: | |
print(f"Workflow audit for {path}:") | |
for action in actions: | |
color = "bold red" if action["needs_update"] else "green" | |
emoji = "🚨" if action["needs_update"] else "✅" | |
print( | |
f"{emoji} [{color}]{action['action']}@{action['current_version']} " | |
f"-> {action['latest_version']}[/{color}]" | |
) | |
return actions | |
class AuditedWorkflow(typing.TypedDict): | |
path: str | |
uses: typing.List[Uses] | |
@app.command("audit_all_workflows") | |
def audit_all_workflows( | |
base_path: str = None, | |
echo: bool = True, | |
) -> typing.Iterator[AuditedWorkflow]: | |
if base_path is None: | |
base_path = BASE_PROJECTS_PATH | |
workflows = [ | |
x | |
for x in pathlib.Path(base_path) | |
.expanduser() | |
.glob("**/.github/workflows/*[.yml]") | |
if "node_modules" not in x.parts | |
] | |
audits = [] | |
for path in workflows: | |
result = audit_workflow(path, echo=False) | |
audits.append(AuditedWorkflow(path=path, uses=result)) | |
if echo: | |
print(f"Audited {len(audits)} workflows.") | |
for audit in audits: | |
print(f"Worflow audit for {audit['path']}:") | |
for action in audit["uses"]: | |
color = "bold red" if action["needs_update"] else "green" | |
emoji = "🚨" if action["needs_update"] else "✅" | |
print( | |
f"{emoji} [{color}]{action['action']}@{action['current_version']} " | |
f"-> {action['latest_version']}[/{color}]" | |
) | |
return audits | |
class WorflowUsesSpecifiedAction(typing.TypedDict): | |
path: str | |
uses_specified_action: bool | |
@app.command("check_for_action") | |
def check_for_action( | |
action: str, | |
base_path: str = None, | |
echo: bool = True, | |
return_all_projects: bool = False, | |
) -> typing.Iterator[WorflowUsesSpecifiedAction]: | |
if base_path is None: | |
base_path = BASE_PROJECTS_PATH | |
workflows = [ | |
x | |
for x in pathlib.Path(base_path) | |
.expanduser() | |
.glob("**/.github/workflows/*[.yml]") | |
if "node_modules" not in x.parts | |
] | |
uses_specified_action = [] | |
for path in workflows: | |
uses = audit_workflow(path, echo=False) | |
uses_specified_action.append( | |
WorflowUsesSpecifiedAction( | |
path=path, | |
uses_specified_action=any(x["action"] == action for x in uses), | |
) | |
) | |
total_audited = len(uses_specified_action) | |
if not return_all_projects: | |
uses_specified_action = [ | |
x for x in uses_specified_action if x["uses_specified_action"] | |
] | |
if echo: | |
print(f"Audited {total_audited} workflow{'' if total_audited == 1 else 's'}\n") | |
for audit in uses_specified_action: | |
color = "bold red" if audit["uses_specified_action"] else "green" | |
emoji = "🚨" if audit["uses_specified_action"] else "✅" | |
print( | |
f"{emoji} [{color}]{audit['path']} " | |
f"-> {'Yes' if audit['uses_specified_action'] else 'No'}[/{color}]\n" | |
) | |
return uses_specified_action | |
if __name__ == "__main__": | |
app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is unsupported. Feel free to use and modify for yourself!