Created
November 24, 2016 07:43
-
-
Save ogawatti/e8dca655c678966e9ed6e5eeb347042b to your computer and use it in GitHub Desktop.
Collect Finding
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import boto3 | |
from datetime import datetime | |
import json | |
AWS_REGION = "ap-northeast-1" | |
INSPECTOR_TEMPLATE_ARN = "arn:aws:inspector:ap-northeast-1:************:target/**********/template/**********" | |
S3_BUCKET_NAME = "inspector" | |
def inspector(): | |
return boto3.client('inspector', region_name=AWS_REGION) | |
def s3(): | |
return boto3.client('s3', region_name=AWS_REGION) | |
def get_latest_run(): | |
response = inspector().list_assessment_runs( | |
assessmentTemplateArns = [INSPECTOR_TEMPLATE_ARN], | |
maxResults = 1, | |
filter = { 'states': ['COMPLETED'] } | |
) | |
run_arns = response['assessmentRunArns'] | |
runs = inspector().describe_assessment_runs(assessmentRunArns = run_arns)['assessmentRuns'] | |
while response.has_key('nextToken'): | |
response = inspector().list_assessment_runs( | |
assessmentTemplateArns = [INSPECTOR_TEMPLATE_ARN], | |
filter = { 'states': ['COMPLETED'] }, | |
maxResults = 1, | |
nextToken = response['nextToken'] | |
) | |
run_arns = response['assessmentRunArns'] | |
runs.extend(inspector().describe_assessment_runs(assessmentRunArns = run_arns)['assessmentRuns']) | |
runs = sorted(runs, key=lambda x: x['createdAt'], reverse=True) | |
return runs[0] | |
def get_findings(run_arn): | |
response = inspector().list_findings(assessmentRunArns = [run_arn]) | |
finding_arns = response['findingArns'] | |
findings = inspector().describe_findings(findingArns = finding_arns)['findings'] | |
while response.has_key('nextToken'): | |
response = inspector().list_findings( | |
assessmentRunArns = [run_arn], | |
nextToken = response['nextToken'] | |
) | |
finding_arns = response['findingArns'] | |
findings.extend(inspector().describe_findings(findingArns = finding_arns)['findings']) | |
return findings | |
def get_rule_packages(rules_package_arns): | |
return inspector().describe_rules_packages(rulesPackageArns=rules_package_arns)['rulesPackages'] | |
def parse_finding(finding): | |
parsed_finding = { | |
'id': finding['id'], | |
'severity': finding['severity'], | |
'score': finding['numericSeverity'], | |
'rule': find_rule(finding['serviceAttributes']['rulesPackageArn']), | |
'labels': generate_labels(finding['attributes']), | |
'title': finding['title'], | |
'description': finding['description'], | |
'recommendation': finding['recommendation'] | |
} | |
return parsed_finding | |
# 現在はattributesの最初の値しかlabelに設定していない | |
def generate_labels(attributes): | |
label = None | |
for attribute in attributes: | |
if attribute.has_key('key') and attribute['key'] == 'package_name': | |
label = attribute['value'].split(",")[0] | |
return [ label ] | |
def find_rule(rules_package_arn): | |
rule = None | |
for package in RULES_PACKAGES: | |
if package['arn'] == rules_package_arn: | |
rule = package | |
return rule | |
def post_json(data, filename): | |
response = s3().put_object( | |
Bucket = S3_BUCKET_NAME, | |
Key = filename, | |
Body = data.encode('utf-8'), | |
ContentEncoding = 'utf-8', | |
ContentType = 'application/json' | |
) | |
return response | |
def lambda_handler(event, context): | |
global RULES_PACKAGES | |
latest_run = get_latest_run() | |
findings = get_findings(latest_run['arn']) | |
RULES_PACKAGES = get_rule_packages(latest_run['rulesPackageArns']) | |
json_data = json.dumps(map(parse_finding, findings)) | |
filename = latest_run['name'] + ".json" | |
post_json(json_data, filename) | |
return json_data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment