Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Detect potentially vulnerable github actions workflows for orgs
import requests
import yaml
import re
import json
import time
from argparse import ArgumentParser
class GitHub:
def __init__(self, token=None, verbose=False):
self.session = requests.session()
self.verbose = verbose
self.headers = {}
if token:
self.headers['Authorization'] = 'token %s' % (token)
def request(self, url, page=1, attempt=1):
data = ''
errored = False
try:
res = self.session.get(url, headers=self.headers, params={'page': page})
if res.status_code == 404:
return ''
if int(res.headers.get('x-ratelimit-remaining', 1)) == 0:
reset_time = int(res.headers.get('x-ratelimit-reset'))
sleep_time = (reset_time - int(time.time())) + 1
if self.verbose:
print('Rate limiting in effect, sleeping for %s seconds...' % (sleep_time))
time.sleep(sleep_time)
return self.request(url, page, attempt)
elif res.status_code == 403:
if self.verbose:
print('HTTP 403 unrelated to rate limitng, skipping this request')
return ''
errored = False if res.ok else True
except requests.exceptions.ConnectionError:
errored = True
if errored:
if self.verbose:
print('Request errored - retrying attempt %s' % (attempt))
attempt += 1
if attempt > 5:
print('Request failed after 5 attempts - aborting this request.')
return ''
return self.request(url, page, attempt)
try:
data = res.json()
except json.decoder.JSONDecodeError:
data = res.content
return data
def get_org_members(self, org):
page = 1
members = []
while True:
res = self.request('https://api.github.com/orgs/%s/members' % (org), page)
if len(res) == 0:
if self.verbose:
print('Finished finding members for %s' % (org))
break
for member in res:
if 'login' in member:
if self.verbose:
print('Found member %s' % (member['login']))
members.append(member['login'])
page += 1
return members
def get_org_repos(self, org, is_user=False):
page = 1
repos = []
org_type = 'users' if is_user else 'orgs'
while True:
res = self.request('https://api.github.com/%s/%s/repos' % (org_type, org), page)
if not res and not repos:
print('Failed to get repos for "%s"' % (org))
break
if len(res) == 0:
if self.verbose:
print('Finished finding repos for %s' % (org))
break
for repo in res:
if 'name' in repo:
if self.verbose:
print('Found repo %s' % (repo['name']))
repos.append('%s/%s' % (org, repo['name']))
page += 1
return repos
def check_vulnerable_workflows(self, yaml_html_url, yaml_raw):
vulnerable_workflows = []
yaml_parsed = yaml.safe_load(yaml_raw)
if yaml_parsed:
events = yaml_parsed.get(True, [])
if 'pull_request_target' in events and 'jobs' in yaml_parsed:
for job_name in yaml_parsed['jobs']:
if 'steps' in yaml_parsed['jobs'][job_name]:
for step in yaml_parsed['jobs'][job_name]['steps']:
if 'actions/checkout' in step.get('uses', ''):
with_string = str(step.get('with', ''))
if 'pull_request' in with_string:
vulnerable_workflows.append((yaml_html_url, job_name))
return vulnerable_workflows
def get_workflows(self, repo):
page = 1
vulnerable_workflows = []
while True:
res = self.request('https://api.github.com/repos/%s/actions/workflows' % (repo), page)
if not res:
if self.verbose:
print('Failed to get https://api.github.com/repos/%s/actions/workflows' % (repo))
break
workflows = res.get('workflows', [])
if self.verbose and page == 1:
print('Starting finding workflows for %s' % (repo))
if len(workflows) == 0:
if self.verbose:
print('Finished finding workflows for %s' % (repo))
break
for workflow in workflows:
yaml_path = workflow.get('path')
yaml_html_url = workflow.get('html_url')
if yaml_path and yaml_html_url:
branch_re = re.search(r'%s\/blob/([^/]+)' % (repo), yaml_html_url)
if branch_re:
workflow_branch = branch_re.group(1)
if workflow_branch:
yaml_raw_url = 'https://raw.githubusercontent.com/%s/%s/%s' % (repo, workflow_branch, yaml_path)
if self.verbose:
print('Checking %s...' % (yaml_raw_url))
yaml_raw = self.request(yaml_raw_url)
try:
vulnerable_workflows.extend(self.check_vulnerable_workflows(yaml_html_url, yaml_raw))
except yaml.YAMLError as e:
print(f'Error while parsing yaml for {yaml_html_url}: {e}')
continue
page += 1
return vulnerable_workflows
def main(args):
if not args.org:
print('Must supply a org value')
exit(1)
verbose = args.verbose
org = args.org
token = args.token
check_members = args.members
repos = []
gh = GitHub(token, verbose)
if args.repo:
repos = [args.repo]
else:
if check_members:
members = gh.get_org_members(org)
for member in members:
repos.extend(gh.get_org_repos(member, is_user=True))
else:
repos = gh.get_org_repos(org)
workflows = []
for repo in repos:
workflows.extend(gh.get_workflows(repo))
for workflow in workflows:
print('workflow: %s job: %s' % (workflow[0], workflow[1]))
if __name__ == '__main__':
parser = ArgumentParser(description="Checks whether a Github org has any actions workflows that may be vulnerable to malicious pull requests")
parser.add_argument("-o", "--org", help="Github org to check")
parser.add_argument("-m", "--members", action="store_true", help="Check the repos of members of the org")
parser.add_argument("-r", "--repo", help="Github repo to check")
parser.add_argument("-t", "--token", help="Github token for authenticated API requests, used in the Authorization header")
parser.add_argument("-v", "--verbose", action="store_true", help="More output")
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment