Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Detect potentially vulnerable github actions workflows for orgs
# more info: https://nathandavison.com/blog/github-actions-and-the-threat-of-malicious-pull-requests
import requests
import yaml
import re
import json
import time
from argparse import ArgumentParser
class GitHub:
def __init__(self, token=None, verbose=False):
self.session = requests.session()
self.verbose = verbose
self.headers = {}
if token:
self.headers['Authorization'] = 'token %s' % (token)
def request(self, url, page=1, attempt=1):
data = ''
errored = False
try:
res = self.session.get(url, headers=self.headers, params={'page': page})
if res.status_code == 404:
return ''
if int(res.headers.get('x-ratelimit-remaining', 1)) == 0:
reset_time = int(res.headers.get('x-ratelimit-reset'))
sleep_time = (reset_time - int(time.time())) + 1
if self.verbose:
print('Rate limiting in effect, sleeping for %s seconds...' % (sleep_time))
time.sleep(sleep_time)
return self.request(url, page, attempt)
elif res.status_code == 403:
if self.verbose:
print('HTTP 403 unrelated to rate limitng, skipping this request')
return ''
errored = False if res.ok else True
except requests.exceptions.ConnectionError:
errored = True
if errored:
if self.verbose:
print('Request errored - retrying attempt %s' % (attempt))
attempt += 1
if attempt > 5:
print('Request failed after 5 attempts - aborting this request.')
return ''
return self.request(url, page, attempt)
try:
data = res.json()
except json.decoder.JSONDecodeError:
data = res.content
return data
def get_org_members(self, org):
page = 1
members = []
while True:
res = self.request('https://api.github.com/orgs/%s/members' % (org), page)
if len(res) == 0:
if self.verbose:
print('Finished finding members for %s' % (org))
break
for member in res:
if 'login' in member:
if self.verbose:
print('Found member %s' % (member['login']))
members.append(member['login'])
page += 1
return members
def get_org_repos(self, org, is_user=False):
page = 1
repos = []
org_type = 'users' if is_user else 'orgs'
while True:
res = self.request('https://api.github.com/%s/%s/repos' % (org_type, org), page)
if not res and not repos:
print('Failed to get repos for "%s"' % (org))
break
if len(res) == 0:
if self.verbose:
print('Finished finding repos for %s' % (org))
break
for repo in res:
if 'name' in repo:
if self.verbose:
print('Found repo %s' % (repo['name']))
repos.append(repo['name'])
page += 1
return repos
def check_vulnerable_workflows(self, yaml_html_url, yaml_raw):
vulnerable_workflows = []
yaml_parsed = yaml.safe_load(yaml_raw)
if yaml_parsed:
events = yaml_parsed.get(True, [])
if 'pull_request_target' in events and 'jobs' in yaml_parsed:
for job_name in yaml_parsed['jobs']:
if 'steps' in yaml_parsed['jobs'][job_name]:
for step in yaml_parsed['jobs'][job_name]['steps']:
if 'actions/checkout' in step.get('uses', ''):
with_string = str(step.get('with', ''))
if 'pull_request' in with_string:
vulnerable_workflows.append((yaml_html_url, job_name))
return vulnerable_workflows
def get_workflows(self, repo):
page = 1
vulnerable_workflows = []
while True:
res = self.request('https://api.github.com/repos/%s/actions/workflows' % (repo), page)
if not res:
if self.verbose:
print('Failed to get https://api.github.com/repos/%s/actions/workflows' % (repo))
break
workflows = res.get('workflows', [])
if self.verbose and page == 1:
print('Starting finding workflows for %s' % (repo))
if len(workflows) == 0:
if self.verbose:
print('Finished finding workflows for %s' % (repo))
break
for workflow in workflows:
yaml_path = workflow.get('path')
yaml_html_url = workflow.get('html_url')
if yaml_path and yaml_html_url:
branch_re = re.search(r'%s\/blob/([^/]+)' % (repo), yaml_html_url)
if branch_re:
workflow_branch = branch_re.group(1)
if workflow_branch:
yaml_raw_url = 'https://raw.githubusercontent.com/%s/%s/%s' % (repo, workflow_branch, yaml_path)
if self.verbose:
print('Checking %s...' % (yaml_raw_url))
yaml_raw = self.request(yaml_raw_url)
try:
vulnerable_workflows.extend(self.check_vulnerable_workflows(yaml_html_url, yaml_raw))
except yaml.YAMLError as e:
print(f'Error while parsing yaml for {yaml_html_url}: {e}')
continue
page += 1
return vulnerable_workflows
def main(args):
if not args.org:
print('Must supply a org value')
exit(1)
verbose = args.verbose
org = args.org
token = args.token
check_members = args.members
repos = []
gh = GitHub(token, verbose)
if args.repo:
repos = [args.repo]
else:
if check_members:
members = gh.get_org_members(org)
for member in members:
repos.extend(gh.get_org_repos(member, is_user=True))
else:
repos = gh.get_org_repos(org)
workflows = []
for repo in repos:
workflows.extend(gh.get_workflows(f'{org}/{repo}'))
for workflow in workflows:
print('workflow: %s job: %s' % (workflow[0], workflow[1]))
if __name__ == '__main__':
parser = ArgumentParser(description="Checks whether a Github org has any actions workflows that may be vulnerable to malicious pull requests")
parser.add_argument("-o", "--org", help="Github org to check")
parser.add_argument("-m", "--members", action="store_true", help="Check the repos of members of the org")
parser.add_argument("-r", "--repo", help="Github repo to check")
parser.add_argument("-t", "--token", help="Github token for authenticated API requests, used in the Authorization header")
parser.add_argument("-v", "--verbose", action="store_true", help="More output")
args = parser.parse_args()
main(args)
@hackerhumble
Copy link

Is there any example which is intented to be vulnerable pleaase ?

@xflr6
Copy link

xflr6 commented Dec 18, 2021

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment