Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Get Google Container Registry image vulnerabilities
#!/bin/env python3
from datetime import date, datetime
import json
import logging
import subprocess
# Get all the Docker images in the repository
import uuid
from datetime import date
from typing import List, Any
def get_gcr_images_list(repository):
process = subprocess.Popen(
['gcloud', 'container', 'images', 'list', '--repository=' + repository, '--format=json'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
images = stdout
result = json.loads(images)
return result
def get_image_tags_list(images):
result: List[Any] = []
for image in images:
process = subprocess.Popen(
['gcloud', 'container', 'images', 'list-tags', image.get('name'), '--format=json'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
tags = json.loads(stdout) # get our image tags specifically the full digest and the timestamp
try:
tags[0]['image'] = image.get('name') # stick the image name into the json
result.append(tags)
print('image-name: ' + image.get('name') + ' ' + 'image-tags: ' + str(len(tags)))
# logger('image-name: ' + image.get('name') + ' ' + 'image-tags: ' + str(len(tags)))
except IndexError as err:
# logger(err)
continue
return result
def find_vulnerabilities_for_image_by_digest(resource_url, project_id):
from google.cloud.devtools import containeranalysis_v1
# TODO move the client up another level so I'm not hammering the API, open a session and leave it open?
client = containeranalysis_v1.ContainerAnalysisClient()
grafeas_client = client.get_grafeas_client()
project_name = grafeas_client.project_path(project_id)
filter_str = 'kind="VULNERABILITY" AND resourceUrl="{}"'.format(resource_url)
vulnerabilities = grafeas_client.list_occurrences(project_name, filter_str)
filtered_list = []
for v in vulnerabilities:
print('Adding vuln for: ' + resource_url)
# logger('Adding vuln for: ' + resource_url)
filtered_list.append(v)
return filtered_list
def write_report(vuln_occurrence_list_list):
csv_filename = "container_cve_report_" + str(datetime.now().strftime("%Y%m%d-%H%M%S")) + ".csv"
f = open(csv_filename, "w")
# ZenGRC's vulnerability object table: columns to write:
f.write("Object type,\"Must be unique. Can be left empty for autogeneration. If updating or deleting, "
"code is required\",,,,,,,,,,,,,,,,None,None,None,None,None,None,None,,,,,,,,,,,,,," + '\n'
'Vulnerability,' # What is the package issue?
'Detected,' # Date Detected
'Title*,' # Title for this object
'CVSS Score,' # CVSS Score
'CVE,' # CVE ID
'Reference URL,' # Source for the finding, ie: GCP/GCR, SHA, GitHub, etc URL
'Description,' # Image and Tag
'Remediation,' # Recommended remediation for this finding
'Owner*,' # Team who owns the remediation (how to automate this?
'Effective Date, ' # When the timer starts for this finding
'Stop Date,' # End date for any exception granted this finding
'Status*, ' # Status of this finding, Draft, Active, etc..
'Attachments,' # Any attachments
'Primary Contact,' # Primary owner for this finding
'Secondary Contact,' # Alternate owner for this finding
'Vulnerability URL,' # Source for the CVE Advisory
'Affected Repo,' # What repo is affected?
'Severity,' # What is the finding's severity rank
'Tags,,,,,,,,,,,,,,,,,,,,' + '\n') # Add any useful tags here
for vuln_occurrence_list in vuln_occurrence_list_list:
for vuln_occurrence in vuln_occurrence_list:
# If we ant to filter on cvss_score
# if vuln_occurrence.vulnerability.cvss_score >= 7:
# format the CSV
if vuln_occurrence.vulnerability.package_issue[0].fix_available:
try:
# Set the Severity level for our finding:
cvss_score = vuln_occurrence.vulnerability.cvss_score
severity = ''
# CVSS V3 SCORE RANGE SEVERITY IN ADVISORY
# 0.1 - 3.9 Low
# 4.0 - 6.9 Medium
# 7.0 - 8.9 High
# 9.0 - 10.0 Critical
if 0 <= cvss_score <= 3.9:
severity = 'Low'
if 4.0 <= cvss_score <= 6.9:
severity = 'Medium'
if 7.0 <= cvss_score <= 8.9:
severity = 'High'
if 9.0 <= cvss_score:
severity = 'Critical'
# Create a "table"
f.write(
# Vulnerability: Package issue, affected package
str(vuln_occurrence.vulnerability.package_issue[0].affected_package) + "," +
# Detected: Date first detected
str(date.fromtimestamp(vuln_occurrence.create_time.seconds)) + "," +
# Title*: Title for this object
vuln_occurrence.resource_uri.replace("https://gcr.io/some-registry/", "") + ":" +
vuln_occurrence.vulnerability.package_issue[0].fixed_package + ":" +
vuln_occurrence.vulnerability.package_issue[0].fixed_cpe_uri + ":" +
vuln_occurrence.vulnerability.package_issue[0].fixed_version.full_name + ":" +
str(uuid.uuid4().hex) +
"," +
# CVSS Score: CVSS Score
str(vuln_occurrence.vulnerability.cvss_score) + "," +
# CVE: The CVE ID
str(vuln_occurrence.vulnerability.short_description) + "," +
# Reference URL: source for this finding
str(vuln_occurrence.resource_uri) + "," +
# Description: image and Tag
str(vuln_occurrence.resource_uri).replace("https://gcr.io/some-registry/", "") + "," +
# Remediation: recommended remediation
str(vuln_occurrence.vulnerability.package_issue[0].fixed_package + " "
+ vuln_occurrence.vulnerability.package_issue[0].fixed_cpe_uri + " "
+ vuln_occurrence.vulnerability.package_issue[0].fixed_version.full_name) + "," +
# Owner*: team who owns the remediation (how to automate this?
"" + "," +
# Effective Date: when the timer starts for this finding
str(date.fromtimestamp(vuln_occurrence.create_time.seconds)) +
# Stop Date: end date for any exception granted this finding
",," +
# Status*: status of this finding, Draft, Active, etc..
"Draft" + "," +
# Attachments: Add any attachments? (is this a base64 blob?)
"," +
# Primary Contact: primary owner for this finding
"," +
# Secondary Contact: an alternate owner for this finding
"," +
# Vulnerability URL: CVE vendor source for the advisory
str(vuln_occurrence.vulnerability.related_urls).replace("[url: ", "").replace("\"", "")
.replace("\n", "").replace("label: More Info]", "").replace("\n", "") + "," +
# Affected Repo: vendor source for the advisory
str(vuln_occurrence.resource_uri)
.replace("https://gcr.io/some-registry/", "")
.replace("@sha256", "") + "," +
severity + "," +
# Tags: add any useful tags here
",,,,,,,,,,,,,,,,,,," + "\n")
except IOError as err:
continue
f.close()
def logger(log_obj):
# TODO fix the logging facility
log_format = '%(asctime)-15s %(message)s'
logging.basicConfig(format=log_format)
logger_obj = logging.getLogger('gcp.logger')
logger_obj.warning('Runtime problem: %s', 'issue was', extra=log_obj)
def main():
# 1. get a full list of our images:
images_list = get_gcr_images_list('gcr.io/some-registry')
# 2. get a full list of image tags (get all of the full digests for our image, timestamp)
image_tags_list = get_image_tags_list(images_list)
# 3. get a list of vulnerability occurrences
vuln_occurrence_list = []
for image_tag in image_tags_list:
for image_dict in image_tag:
try:
image_name = str(image_dict['image'])
resource_url = 'https://' + image_name + '@' + str(image_dict.get('digest'))
vuln_occurrences = find_vulnerabilities_for_image_by_digest(resource_url, 'some-registry')
if len(vuln_occurrences) > 0:
vuln_occurrence_list.append(find_vulnerabilities_for_image_by_digest(resource_url, 'some-registry'))
except KeyError as e:
# logger(e)
continue
# Write our report file or google sheet
write_report(vuln_occurrence_list)
# update_sheet(sheet_id, vuln_occurrence_list, './credentials.json')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment