Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save atheiman/8b8056ad756544ec475203aa8b6076ee to your computer and use it in GitHub Desktop.
Save atheiman/8b8056ad756544ec475203aa8b6076ee to your computer and use it in GitHub Desktop.
Lambda Function to update Security Hub Findings attributes "UserDefinedFields" and "Note" to include AWS account and OrganizationalUnit metadata
Resources:
SecurityHubFindingUpdateFunction:
Type: AWS::Lambda::Function
Properties:
Description: Applies metadata to Security Hub findings
Role: !Sub '${SecurityHubFindingUpdateFunctionRole.Arn}'
# ReservedConcurrentExecutions can be used to throttle the function if invocations get too
# high. However, all findings may not be updated.
#ReservedConcurrentExecutions: 3
Environment:
Variables:
# This string is also set in event rule pattern
UPDATED_USER_DEFINED_FIELDS_KEY: '_AccountDataAdded'
Handler: index.handler
Timeout: 20
Runtime: python3.11
Tags:
- Key: CfnStackId
Value: !Ref AWS::StackId
Code:
ZipFile: |
import boto3
import botocore
import json
import os
import time
from functools import lru_cache
from datetime import datetime, timezone
orgs = boto3.client("organizations", region_name=os.environ["AWS_REGION"])
sechub = boto3.client("securityhub", region_name=os.environ["AWS_REGION"])
updated_user_defined_fields_key = os.environ["UPDATED_USER_DEFINED_FIELDS_KEY"]
def get_orgs_resource_tags_dict(resource_id):
# Reform tags list of dictionaries as a single dictionary
tags_list = orgs.list_tags_for_resource(ResourceId=resource_id)["Tags"]
return {t["Key"]: t["Value"] for t in tags_list}
@lru_cache
def get_organizational_parent(child_id, ttl_hash=None):
del ttl_hash # ttl_hash is only used to expire cache entries in lru_cache
parent = orgs.list_parents(ChildId=child_id)["Parents"][0]
if parent["Type"] == "ROOT":
parent_data = orgs.list_roots()["Roots"][0]
else:
parent_data = orgs.describe_organizational_unit(OrganizationalUnitId=parent["Id"])["OrganizationalUnit"]
parent = {**parent, **parent_data}
for k in list(parent.keys()):
# remove unwanted attributes from api response(s)
if k not in ["Id", "Type", "Name"]:
parent.pop(k)
parent["Tags"] = get_orgs_resource_tags_dict(parent["Id"])
return parent
@lru_cache
def get_account_data(account_id, ttl_hash=None):
del ttl_hash # ttl_hash is only used to expire cache entries in lru_cache
account = orgs.describe_account(AccountId=account_id)["Account"]
for k in list(account.keys()):
# remove unwanted attributes from api response(s)
if k not in ["Id", "Email", "Name"]:
account.pop(k)
account["Tags"] = get_orgs_resource_tags_dict(account["Id"])
child_id = account["Id"]
parents = []
while not parents or not parents[-1]["Type"] == "ROOT":
if parents:
child_id = parents[-1]["Id"]
parents.append(get_organizational_parent(child_id, ttl_hash=get_ttl_hash()))
parents.reverse()
account["OrganizationalUnit"] = parents[-1]
# Build OU path using OU names - /Root/path/to/OU
account["OrganizationalUnit"]["Path"] = "/" + "/".join([p["Name"] for p in parents])
return account
# See https://stackoverflow.com/questions/31771286/python-in-memory-cache-with-time-to-live
def get_ttl_hash(seconds=3600):
"""Return the same value within `seconds` time period"""
return round(time.time() / seconds)
def dict_dot_notation(d, path=[]):
d2 = {}
for k, v in d.items():
k_path = path + [str(k)]
k_formatted = ".".join(k_path)
if isinstance(v, dict):
if len(v.keys()) == 0:
# handle empty dict
d2[k_formatted] = str(v)
# merge in dict with recursive call
d2 = {**d2, **dict_dot_notation(v, path=k_path)}
elif isinstance(v, list) or isinstance(v, tuple):
# handle list / tuple as comma separated strings
d2[k_formatted] = ",".join([str(i) for i in v])
else:
# force anything else to string representation
d2[k_formatted] = str(v)
return d2
def handler(event, context):
# print(json.dumps(event))
for finding in event["detail"]["findings"]:
finding_identifier = {"Id": finding["Id"], "ProductArn": finding["ProductArn"]}
print(json.dumps(finding_identifier, default=str))
current_user_defined_fields = finding.get("UserDefinedFields", {})
if updated_user_defined_fields_key in current_user_defined_fields:
print("Finding previously updated, skipping")
return
account = get_account_data(finding["AwsAccountId"], ttl_hash=get_ttl_hash())
timestamp = datetime.now(timezone.utc).strftime("%y-%m-%d %H:%M:%S %Z")
new_user_defined_fields = dict_dot_notation({"Account": account, updated_user_defined_fields_key: timestamp})
user_defined_fields = {**current_user_defined_fields, **new_user_defined_fields}
print("UserDefinedFields")
print(json.dumps(user_defined_fields, default=str))
note = {
# data.Note.Text should NOT be longer than 512 characters.
"Text": json.dumps(account, default=str)[:512],
"UpdatedBy": context.invoked_function_arn,
}
print("Note")
print(json.dumps(note, default=str))
sechub.batch_update_findings(
FindingIdentifiers=[finding_identifier],
Note=note,
UserDefinedFields=user_defined_fields,
)
# print("Combined cache info summary:")
# combined_cache_info = {}
# for f in [get_organizational_parent, get_account_data]:
# combined_cache_info[f.__name__] = {}
# for a in ["currsize", "hits", "misses"]:
# combined_cache_info[f.__name__][a] = getattr(f.cache_info(), a)
# print(json.dumps(combined_cache_info, default=str))
SecurityHubFindingUpdateFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/lambda/${SecurityHubFindingUpdateFunction}'
RetentionInDays: 14
Tags:
- Key: CfnStackId
Value: !Ref AWS::StackId
SecurityHubFindingUpdateFunctionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Tags:
- Key: CfnStackId
Value: !Ref AWS::StackId
ManagedPolicyArns:
- !Sub 'arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
Policies:
- PolicyName: Inline
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- organizations:Describe*
- organizations:List*
- securityhub:BatchUpdateFindings
Resource: '*'
SecurityHubFindingsEventsRule:
Type: AWS::Events::Rule
Properties:
Description: !Sub >-
Invoke Lambda function ${SecurityHubFindingUpdateFunction.Arn} for Security
Hub findings. All new findings and all updates to existing findings. Created
by ${AWS::StackId}
State: ENABLED
EventPattern:
source: [aws.securityhub]
detail-type: [Security Hub Findings - Imported]
detail:
findings:
UserDefinedFields:
# This key string is also set in lambda function env vars
'_AccountDataAdded': [ { "exists": false } ]
Targets:
- Id: SecurityHubFindingUpdateFunction
Arn: !Sub '${SecurityHubFindingUpdateFunction.Arn}'
SecurityHubFindingUpdateFunctionPermissionSecurityHubFindingsEventsRule:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref SecurityHubFindingUpdateFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !Sub '${SecurityHubFindingsEventsRule.Arn}'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment