Skip to content

Instantly share code, notes, and snippets.

@ogawatti
Created November 24, 2016 07:43
Show Gist options
  • Save ogawatti/e8dca655c678966e9ed6e5eeb347042b to your computer and use it in GitHub Desktop.
Save ogawatti/e8dca655c678966e9ed6e5eeb347042b to your computer and use it in GitHub Desktop.
Collect Finding
# -*- coding: utf-8 -*-
import boto3
from datetime import datetime
import json
AWS_REGION = "ap-northeast-1"
INSPECTOR_TEMPLATE_ARN = "arn:aws:inspector:ap-northeast-1:************:target/**********/template/**********"
S3_BUCKET_NAME = "inspector"
def inspector():
return boto3.client('inspector', region_name=AWS_REGION)
def s3():
return boto3.client('s3', region_name=AWS_REGION)
def get_latest_run():
response = inspector().list_assessment_runs(
assessmentTemplateArns = [INSPECTOR_TEMPLATE_ARN],
maxResults = 1,
filter = { 'states': ['COMPLETED'] }
)
run_arns = response['assessmentRunArns']
runs = inspector().describe_assessment_runs(assessmentRunArns = run_arns)['assessmentRuns']
while response.has_key('nextToken'):
response = inspector().list_assessment_runs(
assessmentTemplateArns = [INSPECTOR_TEMPLATE_ARN],
filter = { 'states': ['COMPLETED'] },
maxResults = 1,
nextToken = response['nextToken']
)
run_arns = response['assessmentRunArns']
runs.extend(inspector().describe_assessment_runs(assessmentRunArns = run_arns)['assessmentRuns'])
runs = sorted(runs, key=lambda x: x['createdAt'], reverse=True)
return runs[0]
def get_findings(run_arn):
response = inspector().list_findings(assessmentRunArns = [run_arn])
finding_arns = response['findingArns']
findings = inspector().describe_findings(findingArns = finding_arns)['findings']
while response.has_key('nextToken'):
response = inspector().list_findings(
assessmentRunArns = [run_arn],
nextToken = response['nextToken']
)
finding_arns = response['findingArns']
findings.extend(inspector().describe_findings(findingArns = finding_arns)['findings'])
return findings
def get_rule_packages(rules_package_arns):
return inspector().describe_rules_packages(rulesPackageArns=rules_package_arns)['rulesPackages']
def parse_finding(finding):
parsed_finding = {
'id': finding['id'],
'severity': finding['severity'],
'score': finding['numericSeverity'],
'rule': find_rule(finding['serviceAttributes']['rulesPackageArn']),
'labels': generate_labels(finding['attributes']),
'title': finding['title'],
'description': finding['description'],
'recommendation': finding['recommendation']
}
return parsed_finding
# 現在はattributesの最初の値しかlabelに設定していない
def generate_labels(attributes):
label = None
for attribute in attributes:
if attribute.has_key('key') and attribute['key'] == 'package_name':
label = attribute['value'].split(",")[0]
return [ label ]
def find_rule(rules_package_arn):
rule = None
for package in RULES_PACKAGES:
if package['arn'] == rules_package_arn:
rule = package
return rule
def post_json(data, filename):
response = s3().put_object(
Bucket = S3_BUCKET_NAME,
Key = filename,
Body = data.encode('utf-8'),
ContentEncoding = 'utf-8',
ContentType = 'application/json'
)
return response
def lambda_handler(event, context):
global RULES_PACKAGES
latest_run = get_latest_run()
findings = get_findings(latest_run['arn'])
RULES_PACKAGES = get_rule_packages(latest_run['rulesPackageArns'])
json_data = json.dumps(map(parse_finding, findings))
filename = latest_run['name'] + ".json"
post_json(json_data, filename)
return json_data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment