Last active
June 22, 2020 21:56
-
-
Save gjyoung1974/779baf130d18b0fae9512b6953723dc2 to your computer and use it in GitHub Desktop.
Get Google Container Registry image vulnerabilities
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
from datetime import date, datetime | |
import json | |
import logging | |
import subprocess | |
# Get all the Docker images in the repository | |
import uuid | |
from datetime import date | |
from typing import List, Any | |
def get_gcr_images_list(repository): | |
process = subprocess.Popen( | |
['gcloud', 'container', 'images', 'list', '--repository=' + repository, '--format=json'], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
stdout, stderr = process.communicate() | |
images = stdout | |
result = json.loads(images) | |
return result | |
def get_image_tags_list(images): | |
result: List[Any] = [] | |
for image in images: | |
process = subprocess.Popen( | |
['gcloud', 'container', 'images', 'list-tags', image.get('name'), '--format=json'], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
stdout, stderr = process.communicate() | |
tags = json.loads(stdout) # get our image tags specifically the full digest and the timestamp | |
try: | |
tags[0]['image'] = image.get('name') # stick the image name into the json | |
result.append(tags) | |
print('image-name: ' + image.get('name') + ' ' + 'image-tags: ' + str(len(tags))) | |
# logger('image-name: ' + image.get('name') + ' ' + 'image-tags: ' + str(len(tags))) | |
except IndexError as err: | |
# logger(err) | |
continue | |
return result | |
def find_vulnerabilities_for_image_by_digest(resource_url, project_id): | |
from google.cloud.devtools import containeranalysis_v1 | |
# TODO move the client up another level so I'm not hammering the API, open a session and leave it open? | |
client = containeranalysis_v1.ContainerAnalysisClient() | |
grafeas_client = client.get_grafeas_client() | |
project_name = grafeas_client.project_path(project_id) | |
filter_str = 'kind="VULNERABILITY" AND resourceUrl="{}"'.format(resource_url) | |
vulnerabilities = grafeas_client.list_occurrences(project_name, filter_str) | |
filtered_list = [] | |
for v in vulnerabilities: | |
print('Adding vuln for: ' + resource_url) | |
# logger('Adding vuln for: ' + resource_url) | |
filtered_list.append(v) | |
return filtered_list | |
def write_report(vuln_occurrence_list_list): | |
csv_filename = "container_cve_report_" + str(datetime.now().strftime("%Y%m%d-%H%M%S")) + ".csv" | |
f = open(csv_filename, "w") | |
# ZenGRC's vulnerability object table: columns to write: | |
f.write("Object type,\"Must be unique. Can be left empty for autogeneration. If updating or deleting, " | |
"code is required\",,,,,,,,,,,,,,,,None,None,None,None,None,None,None,,,,,,,,,,,,,," + '\n' | |
'Vulnerability,' # What is the package issue? | |
'Detected,' # Date Detected | |
'Title*,' # Title for this object | |
'CVSS Score,' # CVSS Score | |
'CVE,' # CVE ID | |
'Reference URL,' # Source for the finding, ie: GCP/GCR, SHA, GitHub, etc URL | |
'Description,' # Image and Tag | |
'Remediation,' # Recommended remediation for this finding | |
'Owner*,' # Team who owns the remediation (how to automate this? | |
'Effective Date, ' # When the timer starts for this finding | |
'Stop Date,' # End date for any exception granted this finding | |
'Status*, ' # Status of this finding, Draft, Active, etc.. | |
'Attachments,' # Any attachments | |
'Primary Contact,' # Primary owner for this finding | |
'Secondary Contact,' # Alternate owner for this finding | |
'Vulnerability URL,' # Source for the CVE Advisory | |
'Affected Repo,' # What repo is affected? | |
'Severity,' # What is the finding's severity rank | |
'Tags,,,,,,,,,,,,,,,,,,,,' + '\n') # Add any useful tags here | |
for vuln_occurrence_list in vuln_occurrence_list_list: | |
for vuln_occurrence in vuln_occurrence_list: | |
# If we ant to filter on cvss_score | |
# if vuln_occurrence.vulnerability.cvss_score >= 7: | |
# format the CSV | |
if vuln_occurrence.vulnerability.package_issue[0].fix_available: | |
try: | |
# Set the Severity level for our finding: | |
cvss_score = vuln_occurrence.vulnerability.cvss_score | |
severity = '' | |
# CVSS V3 SCORE RANGE SEVERITY IN ADVISORY | |
# 0.1 - 3.9 Low | |
# 4.0 - 6.9 Medium | |
# 7.0 - 8.9 High | |
# 9.0 - 10.0 Critical | |
if 0 <= cvss_score <= 3.9: | |
severity = 'Low' | |
if 4.0 <= cvss_score <= 6.9: | |
severity = 'Medium' | |
if 7.0 <= cvss_score <= 8.9: | |
severity = 'High' | |
if 9.0 <= cvss_score: | |
severity = 'Critical' | |
# Create a "table" | |
f.write( | |
# Vulnerability: Package issue, affected package | |
str(vuln_occurrence.vulnerability.package_issue[0].affected_package) + "," + | |
# Detected: Date first detected | |
str(date.fromtimestamp(vuln_occurrence.create_time.seconds)) + "," + | |
# Title*: Title for this object | |
vuln_occurrence.resource_uri.replace("https://gcr.io/some-registry/", "") + ":" + | |
vuln_occurrence.vulnerability.package_issue[0].fixed_package + ":" + | |
vuln_occurrence.vulnerability.package_issue[0].fixed_cpe_uri + ":" + | |
vuln_occurrence.vulnerability.package_issue[0].fixed_version.full_name + ":" + | |
str(uuid.uuid4().hex) + | |
"," + | |
# CVSS Score: CVSS Score | |
str(vuln_occurrence.vulnerability.cvss_score) + "," + | |
# CVE: The CVE ID | |
str(vuln_occurrence.vulnerability.short_description) + "," + | |
# Reference URL: source for this finding | |
str(vuln_occurrence.resource_uri) + "," + | |
# Description: image and Tag | |
str(vuln_occurrence.resource_uri).replace("https://gcr.io/some-registry/", "") + "," + | |
# Remediation: recommended remediation | |
str(vuln_occurrence.vulnerability.package_issue[0].fixed_package + " " | |
+ vuln_occurrence.vulnerability.package_issue[0].fixed_cpe_uri + " " | |
+ vuln_occurrence.vulnerability.package_issue[0].fixed_version.full_name) + "," + | |
# Owner*: team who owns the remediation (how to automate this? | |
"" + "," + | |
# Effective Date: when the timer starts for this finding | |
str(date.fromtimestamp(vuln_occurrence.create_time.seconds)) + | |
# Stop Date: end date for any exception granted this finding | |
",," + | |
# Status*: status of this finding, Draft, Active, etc.. | |
"Draft" + "," + | |
# Attachments: Add any attachments? (is this a base64 blob?) | |
"," + | |
# Primary Contact: primary owner for this finding | |
"," + | |
# Secondary Contact: an alternate owner for this finding | |
"," + | |
# Vulnerability URL: CVE vendor source for the advisory | |
str(vuln_occurrence.vulnerability.related_urls).replace("[url: ", "").replace("\"", "") | |
.replace("\n", "").replace("label: More Info]", "").replace("\n", "") + "," + | |
# Affected Repo: vendor source for the advisory | |
str(vuln_occurrence.resource_uri) | |
.replace("https://gcr.io/some-registry/", "") | |
.replace("@sha256", "") + "," + | |
severity + "," + | |
# Tags: add any useful tags here | |
",,,,,,,,,,,,,,,,,,," + "\n") | |
except IOError as err: | |
continue | |
f.close() | |
def logger(log_obj): | |
# TODO fix the logging facility | |
log_format = '%(asctime)-15s %(message)s' | |
logging.basicConfig(format=log_format) | |
logger_obj = logging.getLogger('gcp.logger') | |
logger_obj.warning('Runtime problem: %s', 'issue was', extra=log_obj) | |
def main(): | |
# 1. get a full list of our images: | |
images_list = get_gcr_images_list('gcr.io/some-registry') | |
# 2. get a full list of image tags (get all of the full digests for our image, timestamp) | |
image_tags_list = get_image_tags_list(images_list) | |
# 3. get a list of vulnerability occurrences | |
vuln_occurrence_list = [] | |
for image_tag in image_tags_list: | |
for image_dict in image_tag: | |
try: | |
image_name = str(image_dict['image']) | |
resource_url = 'https://' + image_name + '@' + str(image_dict.get('digest')) | |
vuln_occurrences = find_vulnerabilities_for_image_by_digest(resource_url, 'some-registry') | |
if len(vuln_occurrences) > 0: | |
vuln_occurrence_list.append(find_vulnerabilities_for_image_by_digest(resource_url, 'some-registry')) | |
except KeyError as e: | |
# logger(e) | |
continue | |
# Write our report file or google sheet | |
write_report(vuln_occurrence_list) | |
# update_sheet(sheet_id, vuln_occurrence_list, './credentials.json') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment