Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Detect potentially vulnerable github actions workflows for orgs
import requests
import yaml
import re
import json
import time
from argparse import ArgumentParser
class GitHub:
def __init__(self, token=None, verbose=False):
self.session = requests.session()
self.verbose = verbose
self.headers = {}
if token:
self.headers['Authorization'] = 'token %s' % (token)
def request(self, url, page=1, attempt=1):
data = ''
errored = False
try:
res = self.session.get(url, headers=self.headers, params={'page': page})
if res.status_code == 404:
return ''
if int(res.headers.get('X-RateLimit-Remaining', 1)) == 0:
if self.verbose:
print('Rate limiting in effect, sleeping for 1 hour...')
time.sleep(3600)
return self.request(url, page, attempt)
errored = False if res.ok else True
except requests.exceptions.ConnectionError:
errored = True
if errored:
if self.verbose:
print('Request errored - retrying attempt %s' % (attempt))
attempt += 1
if attempt > 5:
raise Exception('Request failed after 5 attempts - aborting.')
return self.request(url, page, attempt)
try:
data = res.json()
except json.decoder.JSONDecodeError:
data = res.content
return data
def get_org_repos(self, org):
page = 1
repos = []
while True:
res = self.request('https://api.github.com/orgs/%s/repos' % (org), page)
if type(res) is dict and res.get('message', None) == 'Not Found':
res = self.request('https://api.github.com/users/%s/repos' % (org), page)
if len(res) == 0:
if self.verbose:
print('Finished finding repos for %s' % (org))
break
for repo in res:
if 'name' in repo:
if self.verbose:
print('Found repo %s' % (repo['name']))
repos.append(repo['name'])
page += 1
return repos
def check_vulnerable_workflows(self, yaml_html_url, yaml_raw):
vulnerable_workflows = []
yaml_parsed = yaml.safe_load(yaml_raw)
if yaml_parsed:
events = yaml_parsed.get(True, [])
if 'pull_request_target' in events and 'jobs' in yaml_parsed:
for job_name in yaml_parsed['jobs']:
if 'steps' in yaml_parsed['jobs'][job_name]:
for step in yaml_parsed['jobs'][job_name]['steps']:
if 'actions/checkout' in step.get('uses', ''):
with_string = str(step.get('with', ''))
if 'pull_request' in with_string:
vulnerable_workflows.append((yaml_html_url, job_name))
return vulnerable_workflows
def get_workflows(self, org, repo):
page = 1
vulnerable_workflows = []
while True:
res = self.request('https://api.github.com/repos/%s/%s/actions/workflows' % (org, repo), page)
if not res:
if self.verbose:
print('Failed to get https://api.github.com/repos/%s/%s/actions/workflows' % (org, repo))
continue
workflows = res.get('workflows', [])
if self.verbose and page == 1:
print('Starting finding workflows for %s/%s' % (org, repo))
if len(workflows) == 0:
if self.verbose:
print('Finished finding workflows for %s/%s' % (org, repo))
break
for workflow in workflows:
yaml_path = workflow.get('path')
yaml_html_url = workflow.get('html_url')
if yaml_path and yaml_html_url:
branch_re = re.search(r'%s\/%s\/blob/([^/]+)' % (org, repo), yaml_html_url)
if branch_re:
workflow_branch = branch_re.group(1)
if workflow_branch:
yaml_raw_url = 'https://raw.githubusercontent.com/%s/%s/%s/%s' % (org, repo, workflow_branch, yaml_path)
if self.verbose:
print('Checking %s...' % (yaml_raw_url))
yaml_raw = self.request(yaml_raw_url)
try:
vulnerable_workflows.extend(self.check_vulnerable_workflows(yaml_html_url, yaml_raw))
except yaml.YAMLError as e:
print(f'Error while parsing yaml for {yaml_html_url}: {e}')
continue
page += 1
return vulnerable_workflows
def main(args):
if not args.org:
print('Must supply a org value')
exit(1)
verbose = args.verbose
org = args.org
token = args.token
gh = GitHub(token, verbose)
if args.repo:
repos = [args.repo]
else:
repos = gh.get_org_repos(org)
workflows = []
for repo in repos:
workflows.extend(gh.get_workflows(org, repo))
for workflow in workflows:
print('workflow: %s job: %s' % (workflow[0], workflow[1]))
if __name__ == '__main__':
parser = ArgumentParser(description="Checks whether a Github org has any actions workflows that may be vulnerable to malicious pull requests")
parser.add_argument("-o", "--org", help="Github org to check")
parser.add_argument("-r", "--repo", help="Github repo to check")
parser.add_argument("-t", "--token", help="Github token for authenticated API requests, used in the Authorization header")
parser.add_argument("-v", "--verbose", action="store_true", help="More output")
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment