Skip to content

Instantly share code, notes, and snippets.

@atheiman
Last active December 4, 2024 01:18
Show Gist options
  • Save atheiman/5b0dc87eb8d5096e4e2956b3ab8997f5 to your computer and use it in GitHub Desktop.
Save atheiman/5b0dc87eb8d5096e4e2956b3ab8997f5 to your computer and use it in GitHub Desktop.
Terraform to deploy a Config custom rule w/ Lambda function to evaluate resource tag compliance
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = ">= 5.0"
}
}
}
data "aws_partition" "current" {}
locals {
partition = data.aws_partition.current.partition
# Note that aws_config_organization_custom_rule will also deploy the rule to the current account
# by default. Make sure the cross account role exists and can be assumed in the current account.
cross_acct_role_with_path = "/OrganizationAccountAccessRole"
}
data "archive_file" "lambda" {
type = "zip"
source_file = "./config_tag_compliance_evaluation.py"
output_path = "./lambda.zip"
}
resource "aws_lambda_function" "config_evaluation" {
function_name = "config-tagging-compliance"
filename = data.archive_file.lambda.output_path
role = aws_iam_role.lambda.arn
timeout = 10
handler = "config_evaluation.handler"
runtime = "python3.11"
source_code_hash = data.archive_file.lambda.output_base64sha256
environment {
variables = {
CROSS_ACCT_ROLE_WITH_PATH = local.cross_acct_role_with_path
}
}
}
resource "aws_lambda_permission" "config_evaluation_config" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.config_evaluation.function_name
principal = "config.amazonaws.com"
# TODO: specify wildcard matching config rule arn
# source_arn = "arn:${local.partition}:config:*:*:rule/some-rule-name"
# Lambda service does not yet support aws:SourceOrgID condition. `source_account` or `source_arn`
# can be used until source_org_id is supported.
# https://aws.amazon.com/about-aws/whats-new/2023/11/organization-wide-iam-condition-keys-restrict-aws-service-to-service-requests/
}
resource "aws_cloudwatch_log_group" "lambda" {
name = "/aws/lambda/${aws_lambda_function.config_evaluation.function_name}"
retention_in_days = 14
}
resource "aws_iam_role" "lambda" {
name_prefix = "lambda-config-tagging-compliance-"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
Action = "sts:AssumeRole"
},
]
})
managed_policy_arns = ["arn:${local.partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"]
}
resource "aws_iam_role_policy" "lambda" {
name = "Inline"
role = aws_iam_role.lambda.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = "sts:AssumeRole"
Resource = "arn:${local.partition}:iam::*:role${local.cross_acct_role_with_path}"
},
]
})
}
# Either organization-custom-rule or organization-conformance-pack can be used to deployed
# the rule to the organization.
# organization_custom_rule will be deployed with name prefix `OrgConfigRule-` and a random suffix.
resource "aws_config_organization_custom_rule" "required-tags" {
depends_on = [aws_lambda_permission.config_evaluation_config]
name = "required-tags"
lambda_function_arn = aws_lambda_function.config_evaluation.arn
# Resource types: https://docs.aws.amazon.com/config/latest/developerguide/resource-config-reference.html
resource_types_scope = [
"AWS::EC2::Instance",
"AWS::RDS::DBCluster",
"AWS::RDS::DBInstance",
"AWS::S3::Bucket",
]
# ConfigurationItemChangeNotification and OversizedConfigurationItemChangeNotification
# send full resource config to evaluation function for easier evaluation.
# ScheduledNotification only notifies the evaluation function that it is time to evaluate
# all resources. "Change" notifications are much easier to write evaluation functions for
# because of this. See documented events sent by Config here:
# https://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_example-events.html
trigger_types = [
"ConfigurationItemChangeNotification",
"OversizedConfigurationItemChangeNotification",
# "ScheduledNotification",
]
}
# resource "aws_config_organization_conformance_pack" "example" {
# depends_on = [aws_lambda_permission.config_evaluation_config]
# name = "example"
# template_body = <<-EOT
# Resources:
# ConfigRule:
# Type: AWS::Config::ConfigRule
# Properties:
# ConfigRuleName: required-tags
# Description: Evaluate tag policy compliance of AWS resources
# Scope:
# ComplianceResourceTypes:
# # Resource types: https://docs.aws.amazon.com/config/latest/developerguide/resource-config-reference.html
# - 'AWS::EC2::Instance'
# - 'AWS::RDS::DBCluster'
# - 'AWS::RDS::DBInstance'
# - 'AWS::S3::Bucket'
# Source:
# Owner: CUSTOM_LAMBDA
# SourceIdentifier: !Sub "${aws_lambda_function.config_evaluation.arn}"
# SourceDetails:
# # ConfigurationItemChangeNotification and OversizedConfigurationItemChangeNotification
# # send full resource config to evaluation function for easier evaluation.
# # ScheduledNotification only notifies the evaluation function that it is time to evaluate
# # all resources. "Change" notifications are much easier to write evaluation functions for
# # because of this. See documented events sent by Config here:
# # https://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_example-events.html
# - MessageType: ConfigurationItemChangeNotification
# EventSource: aws.config
# - MessageType: OversizedConfigurationItemChangeNotification
# EventSource: aws.config
# # - MessageType: ScheduledNotification
# # MaximumExecutionFrequency: TwentyFour_Hours
# # EventSource: aws.config
# EOT
# }
# See documented events sent by Config here: https://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_example-events.html
#
# It is much easier to write evaluations for rules using ConfigurationItemChangeNotification and
# OversizedConfigurationItemChangeNotification. These notifications include the resource as recorded
# by Config. The Lambda function can review the resource config json and submit an evaluation for
# the resource.
#
# ScheduledNotification events are not specific to a resource, the event only includes
# the account id and rule name. Lambda functions must list all the resources in the account using
# service apis, call the appropriate apis to evaluate the resources config, and then submit
# evaluations for each resource.
import boto3
import os
import json
import re
if boto3.session.Session().region_name.startswith("us-gov-"):
partition = "aws-us-gov"
else:
partition = "aws"
sts = boto3.client("sts")
# Mapping of required tag keys and optionally tag value requirements
required_tags = {
"Environment": {"AllowedValues": ["prod", "nonprod"]},
"Application": {},
"Owner": {"Regex": "^.+@example.com$"},
}
def handler(event, context):
# print(json.dumps(event, default=str))
invoking_event = json.loads(event["invokingEvent"])
# print(json.dumps(invoking_event, default=str))
ci = invoking_event["configurationItem"]
# print(json.dumps(ci, default=str))
print(ci["resourceType"], ci["ARN"])
# Open a boto3 session in the resource account. Used to interact with Config api to submit
# resource evaluations.
resource_acct_session = get_session(ci["awsAccountId"])
print("Resource acct session created:", resource_acct_session.client("sts").get_caller_identity()["Arn"])
config = resource_acct_session.client("config")
# Verify resource tags are exposed in the CI. If this ever raises an error for a resource type,
# the tags might be exposed elsewhere in the invokingEvent, or a boto3 service client could be
# used to retrieve resource tags from the service.
if "tags" not in ci:
raise Exception(
f"Could not find 'tags' in invokingEvent.configurationItem - {ci['resourceType']} - {ci['ARN']}"
)
tags_dict = ci["tags"]
print(json.dumps(tags_dict))
missing_required = []
# Annotation is a message used to give more context to why a resource is noncompliant. Build a list
# of annotations for violations to be joined as a string later.
annotations = []
evaluation = {
"ComplianceResourceType": ci["resourceType"],
"ComplianceResourceId": ci["resourceId"],
"ComplianceType": "COMPLIANT",
"OrderingTimestamp": invoking_event["notificationCreationTime"],
}
# Evaluate compliance against each required tag
for required_key, value_spec in required_tags.items():
# Verify the required tag exists
if required_key not in tags_dict:
evaluation["ComplianceType"] = "NON_COMPLIANT"
missing_required.append(required_key)
# Continue to next required tag key, no need for further evaluation
continue
if "AllowedValues" in value_spec:
if tags_dict[required_key] not in value_spec["AllowedValues"]:
evaluation["ComplianceType"] = "NON_COMPLIANT"
annotations.append(f"Tag '{required_key}' value must be one of {value_spec['AllowedValues']}.")
if "Regex" in value_spec:
if not re.search(value_spec["Regex"], tags_dict[required_key]):
evaluation["ComplianceType"] = "NON_COMPLIANT"
annotations.append(f"Tag '{required_key}' value must match regular expression '{value_spec['Regex']}'.")
# Show any missing required tags first in the annotations
if missing_required:
annotations.insert(0, f"Missing required tag keys: {missing_required}.")
# Annotation must be submitted as a single string
annotation = " ".join(annotations)
# Annotation is limited to 256 characters by the Config api
if len(annotation) > 256:
annotation = annotation[:253] + "..."
if annotation:
evaluation["Annotation"] = annotation
print(json.dumps(evaluation))
# Submit the evaluation and log the api response
print(
json.dumps(
config.put_evaluations(Evaluations=[evaluation], ResultToken=event["resultToken"]),
default=str,
)
)
def get_session(acct_id):
# get credentials for role in other account
creds = sts.assume_role(
RoleArn=f"arn:{partition}:iam::{acct_id}:role{os.environ['CROSS_ACCT_ROLE_WITH_PATH']}",
RoleSessionName=os.environ.get("AWS_LAMBDA_FUNCTION_NAME", "cross-acct-config-rule"),
)["Credentials"]
# create a boto3 session using credentials
return boto3.Session(
aws_access_key_id=creds["AccessKeyId"],
aws_secret_access_key=creds["SecretAccessKey"],
aws_session_token=creds["SessionToken"],
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment