Detect potentially vulnerable github actions workflows for orgs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# more info: https://nathandavison.com/blog/github-actions-and-the-threat-of-malicious-pull-requests | |
import requests | |
import yaml | |
import re | |
import json | |
import time | |
from argparse import ArgumentParser | |
class GitHub: | |
def __init__(self, token=None, verbose=False): | |
self.session = requests.session() | |
self.verbose = verbose | |
self.headers = {} | |
if token: | |
self.headers['Authorization'] = 'token %s' % (token) | |
def request(self, url, page=1, attempt=1): | |
data = '' | |
errored = False | |
try: | |
res = self.session.get(url, headers=self.headers, params={'page': page}) | |
if res.status_code == 404: | |
return '' | |
if int(res.headers.get('x-ratelimit-remaining', 1)) == 0: | |
reset_time = int(res.headers.get('x-ratelimit-reset')) | |
sleep_time = (reset_time - int(time.time())) + 1 | |
if self.verbose: | |
print('Rate limiting in effect, sleeping for %s seconds...' % (sleep_time)) | |
time.sleep(sleep_time) | |
return self.request(url, page, attempt) | |
elif res.status_code == 403: | |
if self.verbose: | |
print('HTTP 403 unrelated to rate limitng, skipping this request') | |
return '' | |
errored = False if res.ok else True | |
except requests.exceptions.ConnectionError: | |
errored = True | |
if errored: | |
if self.verbose: | |
print('Request errored - retrying attempt %s' % (attempt)) | |
attempt += 1 | |
if attempt > 5: | |
print('Request failed after 5 attempts - aborting this request.') | |
return '' | |
return self.request(url, page, attempt) | |
try: | |
data = res.json() | |
except json.decoder.JSONDecodeError: | |
data = res.content | |
return data | |
def get_org_members(self, org): | |
page = 1 | |
members = [] | |
while True: | |
res = self.request('https://api.github.com/orgs/%s/members' % (org), page) | |
if len(res) == 0: | |
if self.verbose: | |
print('Finished finding members for %s' % (org)) | |
break | |
for member in res: | |
if 'login' in member: | |
if self.verbose: | |
print('Found member %s' % (member['login'])) | |
members.append(member['login']) | |
page += 1 | |
return members | |
def get_org_repos(self, org, is_user=False): | |
page = 1 | |
repos = [] | |
org_type = 'users' if is_user else 'orgs' | |
while True: | |
res = self.request('https://api.github.com/%s/%s/repos' % (org_type, org), page) | |
if not res and not repos: | |
print('Failed to get repos for "%s"' % (org)) | |
break | |
if len(res) == 0: | |
if self.verbose: | |
print('Finished finding repos for %s' % (org)) | |
break | |
for repo in res: | |
if 'name' in repo: | |
if self.verbose: | |
print('Found repo %s' % (repo['name'])) | |
repos.append(repo['name']) | |
page += 1 | |
return repos | |
def check_vulnerable_workflows(self, yaml_html_url, yaml_raw): | |
vulnerable_workflows = [] | |
yaml_parsed = yaml.safe_load(yaml_raw) | |
if yaml_parsed: | |
events = yaml_parsed.get(True, []) | |
if 'pull_request_target' in events and 'jobs' in yaml_parsed: | |
for job_name in yaml_parsed['jobs']: | |
if 'steps' in yaml_parsed['jobs'][job_name]: | |
for step in yaml_parsed['jobs'][job_name]['steps']: | |
if 'actions/checkout' in step.get('uses', ''): | |
with_string = str(step.get('with', '')) | |
if 'pull_request' in with_string: | |
vulnerable_workflows.append((yaml_html_url, job_name)) | |
return vulnerable_workflows | |
def get_workflows(self, repo): | |
page = 1 | |
vulnerable_workflows = [] | |
while True: | |
res = self.request('https://api.github.com/repos/%s/actions/workflows' % (repo), page) | |
if not res: | |
if self.verbose: | |
print('Failed to get https://api.github.com/repos/%s/actions/workflows' % (repo)) | |
break | |
workflows = res.get('workflows', []) | |
if self.verbose and page == 1: | |
print('Starting finding workflows for %s' % (repo)) | |
if len(workflows) == 0: | |
if self.verbose: | |
print('Finished finding workflows for %s' % (repo)) | |
break | |
for workflow in workflows: | |
yaml_path = workflow.get('path') | |
yaml_html_url = workflow.get('html_url') | |
if yaml_path and yaml_html_url: | |
branch_re = re.search(r'%s\/blob/([^/]+)' % (repo), yaml_html_url) | |
if branch_re: | |
workflow_branch = branch_re.group(1) | |
if workflow_branch: | |
yaml_raw_url = 'https://raw.githubusercontent.com/%s/%s/%s' % (repo, workflow_branch, yaml_path) | |
if self.verbose: | |
print('Checking %s...' % (yaml_raw_url)) | |
yaml_raw = self.request(yaml_raw_url) | |
try: | |
vulnerable_workflows.extend(self.check_vulnerable_workflows(yaml_html_url, yaml_raw)) | |
except yaml.YAMLError as e: | |
print(f'Error while parsing yaml for {yaml_html_url}: {e}') | |
continue | |
page += 1 | |
return vulnerable_workflows | |
def main(args): | |
if not args.org: | |
print('Must supply a org value') | |
exit(1) | |
verbose = args.verbose | |
org = args.org | |
token = args.token | |
check_members = args.members | |
repos = [] | |
gh = GitHub(token, verbose) | |
if args.repo: | |
repos = [args.repo] | |
else: | |
if check_members: | |
members = gh.get_org_members(org) | |
for member in members: | |
repos.extend(gh.get_org_repos(member, is_user=True)) | |
else: | |
repos = gh.get_org_repos(org) | |
workflows = [] | |
for repo in repos: | |
workflows.extend(gh.get_workflows(f'{org}/{repo}')) | |
for workflow in workflows: | |
print('workflow: %s job: %s' % (workflow[0], workflow[1])) | |
if __name__ == '__main__': | |
parser = ArgumentParser(description="Checks whether a Github org has any actions workflows that may be vulnerable to malicious pull requests") | |
parser.add_argument("-o", "--org", help="Github org to check") | |
parser.add_argument("-m", "--members", action="store_true", help="Check the repos of members of the org") | |
parser.add_argument("-r", "--repo", help="Github repo to check") | |
parser.add_argument("-t", "--token", help="Github token for authenticated API requests, used in the Authorization header") | |
parser.add_argument("-v", "--verbose", action="store_true", help="More output") | |
args = parser.parse_args() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Is there any example which is intented to be vulnerable pleaase ?