Skip to content

Instantly share code, notes, and snippets.

@mikebway
Last active October 4, 2022 17:50
Show Gist options
  • Save mikebway/87fe638262c7c08a64648c6f7b380afe to your computer and use it in GitHub Desktop.
Save mikebway/87fe638262c7c08a64648c6f7b380afe to your computer and use it in GitHub Desktop.
Golang DynamoDB CDC stream Lambda support for unmarshalling record images to custom structure types
package ddbevent
import (
"fmt"
"github.com/aws/aws-lambda-go/events"
)
// Why This Is Needed
// ==================
//
// Rather than reusing the same code Amazon has repeated the implementation of DynamoDB "attribute value" structures in three
// separate SDK APIs: the `service/dynamodb` DynamoDB read-write API, the `service/dynamodbstreams` CDC stream handling API,
// and the `aws-lambda-go/events` Lambda event handling API. Sadly, only the DynamoDB `service/dynamodb` API supports
// marshalling and unmarshalling to and from custom application structure types.
//
// With the `feature/dynamodb/attributevalue` package, Amazon has provided a mechanism to easily convert from the
// `service/dynamodbstreams` attribute value representation to the `service/dynamodb` read-write API representation,
// explicitly in order to support marshalling and unmarshalling to and from custom application structures. However, they
// have not matched that capability for the `aws-lambda-go/events` Lambda event handling API representation.
//
// The result is that Lambda authors have to write a great deal of tedious code to manually walk the received image structures,
// copying them element by element into the format that they actually need the data before they can do any useful work. The
// purpose of this project is to demonstrate how a non-application specific library module can be implemented to simplify
// Lambda interpretation of DynamoDB CDC event streams.
//
//
// How To Use This Package
// =======================
//
// This ddbevent package provides a direct replacement for the DynamoDBEvent type found in the AWS SDK aws-lambda-go/event
// package. Simply swap the aws-lambda-go/events package reference in your DynamoDB CDC stream Lambda's handler implementation
// for for this ddbevent package:
//
// func (h *Handler) HandleRequest(ctx context.Context, event ddbevent.DynamoDBEvent) error {
// // ...
// }
//
// Now imagine that you only care about the new image for inserted or modified DynamoDB "product items", your
// `HandleRequest` implementation might look something like this:
//
// func (h *Handler) HandleRequest(ctx context.Context, event ddbevent.DynamoDBEvent) error {
// err := h.processRecords(&event.Records)
// // ...
// }
//
// and the `processRecords` function would look something like this:
//
// func (h *Handler) processRecords(records *[]ddbevent.DynamoDBEventRecord) error {
// for _, rec := range *records {
// if rec.ChangeType == ddbevent.Insert || rec.ChangeType == ddbevent.Modify {
// newImage := &ProductItem{}
// err = record.UnmarshalNewImage(newImage)
// // ...
// }
//
// // ...
// }
// }
//
// That's all you would need to write to populate a custom `ProductItem` structure with data from a
// DynamoDB record image in a CDC event.
//
// In addition to UnmarshalNewImage, this package also provides the corresponding UnmarshalOldImage function
// and a foundational Transform function.
//
// Both of the Unmarshal...Image functions invoke Transform so you don't typically need to know about that but if
// you want the benefit of conversion from the aws-lambda-go/event types to service/dynamodb types without
// unmarshalling whole images you can invoke Transform yourself before using the unmarshal capability on the subsets
// of the image that you care about.
import (
munm "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
avtype "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// DynamoDBEvent defines a structure that describes the DynamoDB stream event JSON that is passed as an
// event parameter to a CDC stream handling Lambda handler function.
//
// See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html for more
// information about the JSON content of a DynamoDB stream event.
type DynamoDBEvent struct {
// Records is an array of structures each of which represents the changes made to a single
// DynamoDB item / record. Depending on the load on the DynamoDB table, this array might contain
// anywhere from 1 to 100 entries. The upper limit on the batch size may be configured in the CDK
// infrastructure definition for the table.
Records []DynamoDBEventRecord `json:"Records"`
}
// DynamoDBEventRecord represents the changes to a single DynamoDB item / record.
type DynamoDBEventRecord struct {
Change DynamoDBStreamRecord `json:"dynamodb"`
ChangeType ChangeType `json:"eventName"`
EventID string `json:"eventID"`
// ... more fields if needed: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html
// isTransformed will be set to true when the Transform method is called for the first time. It is used to
// prevent multiple invocations of Transform from wasting effort repeating work that has already been done.
isTransformed bool
// transformErr is a large robot that can morph into a transportation vehicle, no, wait, it records whether
// or not an error was encountered while attempting to execute the Transform function.
transformErr error
}
// DynamoDBStreamRecord wraps both the old and the new image for a DynamoDB record in the CDC stream, i.e. the before and after.
// For creation events, only the NewImage will be populated; for removals only the OldImage; for modifications, both will be
// populated allowing comparisons between before and after.
type DynamoDBStreamRecord struct {
// Values found in the DynamoDB JSON Lambda event
Keys map[string]events.DynamoDBAttributeValue `json:"Keys"`
OldImageRaw map[string]events.DynamoDBAttributeValue `json:"OldImage"`
NewImageRaw map[string]events.DynamoDBAttributeValue `json:"NewImage"`
// ... more fields if needed: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html
// The "event" images found in the Lambda JSON stream may be cloned into these regular get/put API AttributeValue
// equivalents using the Transform function. Once populated, these can then easily be marshaled into application
// specific structures with two lines of code something like this:
//
// newImage := &types.ProviderItem{}
// err = record.UnmarshalNewImage(newImage)
//
// AWS has stupidly managed to define three identical DynamoDB attribute value structures between the
// DynamoDB read/write API, DynamoDB Stream API, and Lambda Event API and then failed to give the Lambda event
// version the essential unmarshalling capability of the read write API and no way to transform into it :-(
OldImage map[string]avtype.AttributeValue
NewImage map[string]avtype.AttributeValue
}
// ChangeType is an enumerated string type that is used to indicate whether a ChangeEvent represents the creation,
// update, or removal of an entity.
type ChangeType string
const (
// Undefined signals that we cannot determine what the change type might have been. It is typically used only
// as a placeholder when handling an error condition.
Undefined ChangeType = "UNDEFINED"
// Insert is used to flag that ChangeEvent represents the insertion of a new entity or link item in the DynamoDB table. Entity
// ChangeEvent structures with this type attribute shall contain only the new details; there will have been no old details
// to pass along.
Insert ChangeType = "INSERT"
// Modify is used to flag that ChangeEvent represents the update of a previously existing entity or link item in the DynamoDB
// table. Entity ChangeEvent structures with this type attribute shall contain both the new details and the old; these may be
// compared to determine exactly what changed.
Modify ChangeType = "MODIFY"
// Remove is used to flag that ChangeEvent represents the deletion of a previously existing entity or link item in the DynamoDB
// table. Entity ChangeEvent structures with this type attribute shall contain only the old details about the entity, there
// being no new details to pass along.
Remove ChangeType = "REMOVE"
)
// UnmarshalNewImage and UnmarshalOldImage may be used on any DynamoDBEventRecord to unmarshal the image
// to an application specific structure. If the Transform function has has not already been invoked on
// the record, that will be done before the actual unmarshalling is attempted.
func (r *DynamoDBEventRecord) UnmarshalNewImage(out interface{}) error {
// Make sure we are in a fits state to attempt unmarshalling
err := r.Transform()
if err != nil {
return err
}
// Attempt the unmarshalling (close your eyes, don't look the Medusa in the face)
return munm.UnmarshalMap(r.Change.NewImage, out)
}
// UnmarshalOldImage and UnmarshalNewImage may be used on any DynamoDBEventRecord to unmarshal the image
// to an application specific structure. If the Transform function has not already been invoked on
// the record, that will be done before the actual unmarshalling is attempted.
func (r *DynamoDBEventRecord) UnmarshalOldImage(out interface{}) error {
// Make sure we are in a fits state to attempt unmarshalling
err := r.Transform()
if err != nil {
return err
}
// Attempt the unmarshalling (close your eyes, don't look the Medusa in the face)
return munm.UnmarshalMap(r.Change.OldImage, out)
}
// Transform copies and converts the new and old image values from the DynamoDB Lambda event version of the images
// into a form that can be easily unmarshalled into application defined structures, something that the native raw
// Lambda event implementation is unable to do.
//
// Calling this function more than once is harmless and has no effect; the function knows if it has already been invoked on
// a given DynamoDBEventRecord and will not waste time repeating the work. If you are not certain whether Transform has already
// been invoked, calling it again is the equivalent of testing a flag (that to do not have access to) and calling Transform if
// it is not set so, if in doubt, just call Transform and relax.
func (r *DynamoDBEventRecord) Transform() error {
// Have we already been invoked we have nothing more to do
if r.isTransformed {
return r.transformErr
}
// Is there an old image to transform?
if r.ChangeType == Modify || r.ChangeType == Remove {
// Transform the old image from Lambda event form to the read write API form
r.Change.OldImage, r.transformErr = transformAVMap(r.Change.OldImageRaw)
}
// Is there a new image to transform?
if r.transformErr == nil && (r.ChangeType == Modify || r.ChangeType == Insert) {
// Transform the old image from Lambda event form to read write API form
r.Change.NewImage, r.transformErr = transformAVMap(r.Change.NewImageRaw)
}
// Flag that we have attempted this transformation before and do not need to repeat it
r.isTransformed = true
// It's almost certain that all went well but someday the planets might align otherwise
return r.transformErr
}
// transformAV converts a single DynamoDB Lambda event attribute value to an equivalent DynamoDB read write API attribute value.
func transformAV(rawValue events.DynamoDBAttributeValue) (avtype.AttributeValue, error) {
// Build our response here
var result avtype.AttributeValue
var err error
// What we have to do depends on the type of the raw value input
switch rawValue.DataType() {
case events.DataTypeBinary:
result = &avtype.AttributeValueMemberB{
Value: rawValue.Binary(),
}
case events.DataTypeBoolean:
result = &avtype.AttributeValueMemberBOOL{
Value: rawValue.Boolean(),
}
case events.DataTypeBinarySet:
result = &avtype.AttributeValueMemberBS{
Value: rawValue.BinarySet(),
}
case events.DataTypeList:
l := avtype.AttributeValueMemberL{}
l.Value, err = transformAVSlice(rawValue.List())
result = &l
case events.DataTypeMap:
m := avtype.AttributeValueMemberM{}
m.Value, err = transformAVMap(rawValue.Map())
result = &m
case events.DataTypeNumber:
result = &avtype.AttributeValueMemberN{
Value: rawValue.Number(),
}
case events.DataTypeNumberSet:
result = &avtype.AttributeValueMemberNS{
Value: rawValue.NumberSet(),
}
case events.DataTypeNull:
result = &avtype.AttributeValueMemberNULL{
Value: true,
}
case events.DataTypeString:
result = &avtype.AttributeValueMemberS{
Value: rawValue.String(),
}
case events.DataTypeStringSet:
result = &avtype.AttributeValueMemberSS{
Value: rawValue.StringSet(),
}
default:
// Seriously, AWS must have added a new data type or something unexpected :-(
err = fmt.Errorf("encountered unrecognized event attribute value data type: %d", rawValue.DataType())
}
// Well, that was long-winded but not so hard really
return result, err
}
// transformAVSlice converts a slice of DynamoDB Lambda event attribute values to an equivalent slice of
// DynamoDB read write API attribute values.
func transformAVSlice(rawAVSlice []events.DynamoDBAttributeValue) ([]avtype.AttributeValue, error) {
// Build our result here
result := make([]avtype.AttributeValue, len(rawAVSlice))
// Iterate over the range of the map, converting each individual named attribute value
var err error
for index, rawValue := range rawAVSlice {
result[index], err = transformAV(rawValue)
if err != nil {
return result, err
}
}
// All done, return the profit from our efforts
return result, nil
}
// transformAVMap converts a map of DynamoDB Lambda event attribute values to an equivalent map of
// DynamoDB read write API attribute values.
func transformAVMap(rawAVMap map[string]events.DynamoDBAttributeValue) (map[string]avtype.AttributeValue, error) {
// Build our result here
result := make(map[string]avtype.AttributeValue)
// Iterate over the range of the map, converting each individual named attribute value
var err error
for key, rawValue := range rawAVMap {
result[key], err = transformAV(rawValue)
if err != nil {
return result, err
}
}
// All done, return the profit from our efforts
return result, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment