Skip to content

Instantly share code, notes, and snippets.

@embano1
Last active June 23, 2023 14:49
Show Gist options
  • Save embano1/c347ff315281f18d3278d285fcd9e7f5 to your computer and use it in GitHub Desktop.
Save embano1/c347ff315281f18d3278d285fcd9e7f5 to your computer and use it in GitHub Desktop.
EventBridge CloudEvents Input Transformer

About

Transform EventBridge Events to CloudEvents using Input Transformer, incl. projecting AWS Region and Account ID from the EventBridge event to CloudEvent extended attributes.

Note
You can test whether the transformed CloudEvent is valid using this simple CloudEvent validator utility.

Example Event

Note
Example from https://github.com/awslabs/eventbridge-kafka-connector

{
    "version": "0",
    "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57",
    "account": "1234567890",
    "time": "2023-05-23T11:38:46Z",
    "region": "us-east-1",
    "detail-type": "kafka-connect-json-values-topic",
    "source": "kafka-connect.my-json-values-connector",
    "resources": [],
    "detail": {
        "topic": "json-values-topic",
        "partition": 0,
        "offset": 0,
        "timestamp": 1684841916831,
        "timestampType": "CreateTime",
        "headers": [],
        "key": "order-1",
        "value": {
            "orderItems": [
                "item-1",
                "item-2"
            ],
            "orderCreatedTime": "Tue May 23 13:38:46 CEST 2023"
        }
    }
}

Input Transformer

{
  "data": "$.detail",
  "id": "$.id",
  "source": "$.source",
  "time": "$.time",
  "type": "$.detail-type",
  "region": "$.region",
  "account": "$.account"
}

Template

Note
<data> must be passed without quotes

{
  "specversion": "1.0",
  "id": "<id>",
  "source": "<source>",
  "type": "<type>",
  "time": "<time>",
  "awsregion": "<region>",
  "account": "<account>",
  "data": <data>
}

Result

{
  "specversion": "1.0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57",
  "source": "kafka-connect.my-json-values-connector",
  "type": "kafka-connect-json-values-topic",
  "time": "2023-05-23T11:38:46Z",
  "awsregion": "us-east-1",
  "account": "1234567890",
  "data": {
    "topic": "json-values-topic",
    "partition": 0,
    "offset": 0,
    "timestamp": 1684841916831,
    "timestampType": "CreateTime",
    "headers": [],
    "key": "order-1",
    "value": {
      "orderItems": [
        "item-1",
        "item-2"
      ],
      "orderCreatedTime": "Tue May 23 13:38:46 CEST 2023"
    }
  }
}
import * as cdk from 'aws-cdk-lib';
import { EventBus, EventField, Rule, RuleTargetInput } from 'aws-cdk-lib/aws-events';
import { LambdaFunction } from 'aws-cdk-lib/aws-events-targets';
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { Construct } from 'constructs';
import { join } from 'path';
export class EbLambdaCloudeventsStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const dlq = new Queue(this, "dlq");
const lambda = new NodejsFunction(this, "ce-function", {
entry: join(__dirname, "..", "lambda", "index.ts"),
handler: "handler",
timeout: cdk.Duration.seconds(3),
reservedConcurrentExecutions: 1,
deadLetterQueue: dlq,
});
const bus = new EventBus(this, "bus");
const target = new LambdaFunction(lambda, {
retryAttempts: 0,
deadLetterQueue: dlq,
event: RuleTargetInput.fromObject({
specversion: "1.0",
id: EventField.eventId,
source: EventField.source,
type: EventField.detailType,
time: EventField.time,
region: EventField.region,
subject: EventField.fromPath('$.detail.key'),
account: EventField.account,
data: EventField.fromPath('$.detail')
})
});
new Rule(this, "rule", {
eventBus: bus,
eventPattern: {
account: [cdk.Stack.of(this).account],
},
targets: [target],
})
new cdk.CfnOutput(this, "busArn", {
value: bus.eventBusArn,
})
new cdk.CfnOutput(this, "functionArn", {
value: lambda.functionArn,
})
new cdk.CfnOutput(this, "dlqArn", {
value: dlq.queueArn,
})
}
}
import { CloudEvent } from "cloudevents";
export const handler = async function (event: any) {
console.log("received event: %s", event);
try {
const ce = new CloudEvent(event);
ce.validate(); // throws if invalid
console.log("received valid cloudevent: %s", ce);
return {
statusCode: 200,
body: "",
};
}
catch (e) {
const msg = `invalid cloudevent: ${e}`
console.log(msg)
return {
statusCode: 400,
body: msg,
};
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment