Skip to content

Instantly share code, notes, and snippets.

@skwashd
Created November 3, 2024 09:06
Show Gist options
  • Save skwashd/e21ea7bdc7cb9a14ccf5d3fe44d80bcd to your computer and use it in GitHub Desktop.
Save skwashd/e21ea7bdc7cb9a14ccf5d3fe44d80bcd to your computer and use it in GitHub Desktop.
Archive Replayed EventBridge Events

EventBridge Event Replay Module

Amazon EventBridge Archive ignores replayed events. The archive rule is a managed rule and so there is no way to remove this restriction. This limitation can be frustrating when standing up a new bus.

This Terraform module configures an AWS Step Functions Express workflow that handles replayed events from an EventBridge bus. Events with a replay-name property are sent directly to the workflow and reinjected into the same bus with the replay marker removed.

Flow

  1. EventBridge rule captures events with replay-name property
  2. Events are routed directly to a Step Function
  3. The workflow constructs a new event payload, while preserving the orignal event time
  4. Puts the event on the bus

Usage

module "event_replay" {
  source = "./event-replay"

  bus_name = "my-event-bus"
  key_id   = "alias/my-key"
  role_prefix = "event-replay"
  rule_name = "capture-replay-events"
  
  tags = {
    Environment = "production"
    Service     = "event-replay"
  }
}

Requirements

Name Version
terraform >= 1.0
aws >= 5.0

Providers

Name Version
aws 5.74.0

Modules

No modules.

Resources

Name Type
aws_cloudwatch_event_rule.replay resource
aws_cloudwatch_event_target.sqs resource
aws_cloudwatch_log_group.sfn resource
aws_iam_policy.events resource
aws_iam_policy.sfn resource
aws_iam_role.events resource
aws_iam_role.sfn resource
aws_iam_role_policy_attachment.events resource
aws_iam_role_policy_attachment.sfn resource
aws_sfn_state_machine.replay resource
aws_sqs_queue.dlq resource
aws_cloudwatch_event_bus.target data source
aws_iam_policy.permission_boundary data source
aws_iam_policy_document.events data source
aws_iam_policy_document.events_assume data source
aws_iam_policy_document.sfn data source
aws_iam_policy_document.sfn_assume data source
aws_kms_key.encryption data source

Inputs

Name Description Type Default Required
bus_name Name of the EventBridge event bus string n/a yes
key_id ID of KMS key used for encryption string n/a yes
namespace Namespace for resources string "replay-to-archive" no
permission_boundary ARN of the permission boundary policy to use for IAM roles string null no
role_prefix Prefix to use for IAM role names string "" no
tags Tags to apply to all resources map(string) n/a yes

Outputs

Name Description
dlq_url URL of the dead letter queue
state_machine_arn ARN of the Step Functions state machine
resource "aws_cloudwatch_event_rule" "replay" {
name = "${var.namespace}-to-sfn"
event_bus_name = data.aws_cloudwatch_event_bus.target.name
event_pattern = jsonencode({
"replay-name" = [{
exists = true
}]
})
tags = var.tags
}
resource "aws_cloudwatch_event_target" "sqs" {
rule = aws_cloudwatch_event_rule.replay.name
event_bus_name = aws_cloudwatch_event_rule.replay.event_bus_name
target_id = "sfn"
arn = aws_sfn_state_machine.replay.arn
role_arn = aws_iam_role.events.arn
dead_letter_config {
arn = aws_sqs_queue.dlq.arn
}
}
{
"Comment": "Event replay workflow",
"StartAt": "PrepareEvent",
"States": {
"PrepareEvent": {
"Type": "Pass",
"Parameters": {
"Entries": [{
"Source.$": "$.source",
"DetailType.$": "$.detail-type",
"Detail.$": "$.detail",
"EventBusName": "${event_bus_name}",
"Time.$": "$.time"
}]
},
"Next": "PutEvents"
},
"PutEvents": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:eventbridge:putEvents",
"Parameters": {
"Entries.$": "$.Entries"
},
"End": true,
"Retry": [{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 1,
"MaxAttempts": 5,
"BackoffRate": 2
}],
"Catch": [{
"ErrorEquals": ["States.ALL"],
"Next": "HandleError"
}]
},
"HandleError": {
"Type": "Pass",
"Result": {
"status": "failed",
"error.$": "States.StringToJson($.Cause)"
},
"End": true
}
}
}
resource "aws_iam_role" "sfn" {
name = "${local.role_prefix}sfn"
assume_role_policy = data.aws_iam_policy_document.sfn_assume.json
permissions_boundary = local.permission_boundary
tags = var.tags
}
data "aws_iam_policy_document" "sfn_assume" {
statement {
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["states.amazonaws.com"]
}
}
}
data "aws_iam_policy_document" "sfn" {
statement {
actions = [
"events:PutEvents",
]
resources = [
data.aws_cloudwatch_event_bus.target.arn,
]
}
statement {
actions = [
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutLogEvents",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups",
]
#tfsec:ignore:aws-iam-no-policy-wildcards Wildcards are needed for these unscoped actions
resources = ["*"]
}
}
resource "aws_iam_policy" "sfn" {
name = aws_iam_role.sfn.name
policy = data.aws_iam_policy_document.sfn.json
}
resource "aws_iam_role_policy_attachment" "sfn" {
role = aws_iam_role.sfn.name
policy_arn = aws_iam_policy.sfn.arn
}
resource "aws_iam_role" "events" {
name = "${local.role_prefix}events"
assume_role_policy = data.aws_iam_policy_document.events_assume.json
permissions_boundary = local.permission_boundary
tags = var.tags
}
data "aws_iam_policy_document" "events_assume" {
statement {
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["events.amazonaws.com"]
}
}
}
data "aws_iam_policy_document" "events" {
statement {
actions = [
"states:StartExecution",
]
resources = [
aws_sfn_state_machine.replay.arn,
]
}
statement {
actions = [
"sqs:SendMessage",
]
resources = [
aws_sqs_queue.dlq.arn,
]
}
}
resource "aws_iam_policy" "events" {
name = aws_iam_role.events.name
policy = data.aws_iam_policy_document.events.json
}
resource "aws_iam_role_policy_attachment" "events" {
role = aws_iam_role.events.name
policy_arn = aws_iam_policy.events.arn
}
data "aws_cloudwatch_event_bus" "target" {
name = var.bus_name
}
data "aws_kms_key" "encryption" {
key_id = var.key_id
}
data "aws_iam_policy" "permission_boundary" {
count = var.permission_boundary != null ? 1 : 0
name = var.permission_boundary
}
output "state_machine_arn" {
description = "ARN of the Step Functions state machine"
value = aws_sfn_state_machine.replay.arn
}
output "dlq_url" {
description = "URL of the dead letter queue"
value = aws_sqs_queue.dlq.url
}
locals {
sfn_definition = templatefile("${path.module}/flow.asl.json.tpl", {
event_bus_name = data.aws_cloudwatch_event_bus.target.name
})
}
resource "aws_sfn_state_machine" "replay" {
name = var.namespace
role_arn = aws_iam_role.sfn.arn
type = "EXPRESS"
definition = local.sfn_definition
logging_configuration {
log_destination = "${aws_cloudwatch_log_group.sfn.arn}:*"
include_execution_data = true
level = "ALL"
}
tags = var.tags
}
#tfsec:ignore:aws-cloudwatch-log-group-customer-key Given the data being logged, CWL-SSE is adequate
resource "aws_cloudwatch_log_group" "sfn" {
name = "/aws/vendedlogs/states/${var.namespace}"
retention_in_days = 14
tags = var.tags
}
resource "aws_sqs_queue" "dlq" {
name = "${var.namespace}-dlq"
kms_master_key_id = data.aws_kms_key.encryption.arn
tags = var.tags
}
variable "bus_name" {
type = string
description = "Name of the EventBridge event bus"
}
variable "key_id" {
type = string
description = "ID of KMS key used for encryption"
}
variable "namespace" {
description = "Namespace for resources"
type = string
default = "replay-to-archive"
}
variable "permission_boundary" {
type = string
description = "ARN of the permission boundary policy to use for IAM roles"
default = null
}
variable "role_prefix" {
type = string
description = "Prefix to use for IAM role names"
default = ""
}
variable "tags" {
type = map(string)
description = "Tags to apply to all resources"
}
locals {
permission_boundary = var.permission_boundary != null ? data.aws_iam_policy.permission_boundary[0].arn : null
role_prefix = join("", [(var.role_prefix != "" ? "${var.role_prefix}-" : ""), var.namespace, "-"])
}
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = ">= 5.0"
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment