Last active
October 4, 2022 17:50
-
-
Save mikebway/87fe638262c7c08a64648c6f7b380afe to your computer and use it in GitHub Desktop.
Golang DynamoDB CDC stream Lambda support for unmarshalling record images to custom structure types
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ddbevent | |
import ( | |
"fmt" | |
"github.com/aws/aws-lambda-go/events" | |
) | |
// Why This Is Needed | |
// ================== | |
// | |
// Rather than reusing the same code Amazon has repeated the implementation of DynamoDB "attribute value" structures in three | |
// separate SDK APIs: the `service/dynamodb` DynamoDB read-write API, the `service/dynamodbstreams` CDC stream handling API, | |
// and the `aws-lambda-go/events` Lambda event handling API. Sadly, only the DynamoDB `service/dynamodb` API supports | |
// marshalling and unmarshalling to and from custom application structure types. | |
// | |
// With the `feature/dynamodb/attributevalue` package, Amazon has provided a mechanism to easily convert from the | |
// `service/dynamodbstreams` attribute value representation to the `service/dynamodb` read-write API representation, | |
// explicitly in order to support marshalling and unmarshalling to and from custom application structures. However, they | |
// have not matched that capability for the `aws-lambda-go/events` Lambda event handling API representation. | |
// | |
// The result is that Lambda authors have to write a great deal of tedious code to manually walk the received image structures, | |
// copying them element by element into the format that they actually need the data before they can do any useful work. The | |
// purpose of this project is to demonstrate how a non-application specific library module can be implemented to simplify | |
// Lambda interpretation of DynamoDB CDC event streams. | |
// | |
// | |
// How To Use This Package | |
// ======================= | |
// | |
// This ddbevent package provides a direct replacement for the DynamoDBEvent type found in the AWS SDK aws-lambda-go/event | |
// package. Simply swap the aws-lambda-go/events package reference in your DynamoDB CDC stream Lambda's handler implementation | |
// for for this ddbevent package: | |
// | |
// func (h *Handler) HandleRequest(ctx context.Context, event ddbevent.DynamoDBEvent) error { | |
// // ... | |
// } | |
// | |
// Now imagine that you only care about the new image for inserted or modified DynamoDB "product items", your | |
// `HandleRequest` implementation might look something like this: | |
// | |
// func (h *Handler) HandleRequest(ctx context.Context, event ddbevent.DynamoDBEvent) error { | |
// err := h.processRecords(&event.Records) | |
// // ... | |
// } | |
// | |
// and the `processRecords` function would look something like this: | |
// | |
// func (h *Handler) processRecords(records *[]ddbevent.DynamoDBEventRecord) error { | |
// for _, rec := range *records { | |
// if rec.ChangeType == ddbevent.Insert || rec.ChangeType == ddbevent.Modify { | |
// newImage := &ProductItem{} | |
// err = record.UnmarshalNewImage(newImage) | |
// // ... | |
// } | |
// | |
// // ... | |
// } | |
// } | |
// | |
// That's all you would need to write to populate a custom `ProductItem` structure with data from a | |
// DynamoDB record image in a CDC event. | |
// | |
// In addition to UnmarshalNewImage, this package also provides the corresponding UnmarshalOldImage function | |
// and a foundational Transform function. | |
// | |
// Both of the Unmarshal...Image functions invoke Transform so you don't typically need to know about that but if | |
// you want the benefit of conversion from the aws-lambda-go/event types to service/dynamodb types without | |
// unmarshalling whole images you can invoke Transform yourself before using the unmarshal capability on the subsets | |
// of the image that you care about. | |
import ( | |
munm "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" | |
avtype "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" | |
) | |
// DynamoDBEvent defines a structure that describes the DynamoDB stream event JSON that is passed as an | |
// event parameter to a CDC stream handling Lambda handler function. | |
// | |
// See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html for more | |
// information about the JSON content of a DynamoDB stream event. | |
type DynamoDBEvent struct { | |
// Records is an array of structures each of which represents the changes made to a single | |
// DynamoDB item / record. Depending on the load on the DynamoDB table, this array might contain | |
// anywhere from 1 to 100 entries. The upper limit on the batch size may be configured in the CDK | |
// infrastructure definition for the table. | |
Records []DynamoDBEventRecord `json:"Records"` | |
} | |
// DynamoDBEventRecord represents the changes to a single DynamoDB item / record. | |
type DynamoDBEventRecord struct { | |
Change DynamoDBStreamRecord `json:"dynamodb"` | |
ChangeType ChangeType `json:"eventName"` | |
EventID string `json:"eventID"` | |
// ... more fields if needed: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html | |
// isTransformed will be set to true when the Transform method is called for the first time. It is used to | |
// prevent multiple invocations of Transform from wasting effort repeating work that has already been done. | |
isTransformed bool | |
// transformErr is a large robot that can morph into a transportation vehicle, no, wait, it records whether | |
// or not an error was encountered while attempting to execute the Transform function. | |
transformErr error | |
} | |
// DynamoDBStreamRecord wraps both the old and the new image for a DynamoDB record in the CDC stream, i.e. the before and after. | |
// For creation events, only the NewImage will be populated; for removals only the OldImage; for modifications, both will be | |
// populated allowing comparisons between before and after. | |
type DynamoDBStreamRecord struct { | |
// Values found in the DynamoDB JSON Lambda event | |
Keys map[string]events.DynamoDBAttributeValue `json:"Keys"` | |
OldImageRaw map[string]events.DynamoDBAttributeValue `json:"OldImage"` | |
NewImageRaw map[string]events.DynamoDBAttributeValue `json:"NewImage"` | |
// ... more fields if needed: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html | |
// The "event" images found in the Lambda JSON stream may be cloned into these regular get/put API AttributeValue | |
// equivalents using the Transform function. Once populated, these can then easily be marshaled into application | |
// specific structures with two lines of code something like this: | |
// | |
// newImage := &types.ProviderItem{} | |
// err = record.UnmarshalNewImage(newImage) | |
// | |
// AWS has stupidly managed to define three identical DynamoDB attribute value structures between the | |
// DynamoDB read/write API, DynamoDB Stream API, and Lambda Event API and then failed to give the Lambda event | |
// version the essential unmarshalling capability of the read write API and no way to transform into it :-( | |
OldImage map[string]avtype.AttributeValue | |
NewImage map[string]avtype.AttributeValue | |
} | |
// ChangeType is an enumerated string type that is used to indicate whether a ChangeEvent represents the creation, | |
// update, or removal of an entity. | |
type ChangeType string | |
const ( | |
// Undefined signals that we cannot determine what the change type might have been. It is typically used only | |
// as a placeholder when handling an error condition. | |
Undefined ChangeType = "UNDEFINED" | |
// Insert is used to flag that ChangeEvent represents the insertion of a new entity or link item in the DynamoDB table. Entity | |
// ChangeEvent structures with this type attribute shall contain only the new details; there will have been no old details | |
// to pass along. | |
Insert ChangeType = "INSERT" | |
// Modify is used to flag that ChangeEvent represents the update of a previously existing entity or link item in the DynamoDB | |
// table. Entity ChangeEvent structures with this type attribute shall contain both the new details and the old; these may be | |
// compared to determine exactly what changed. | |
Modify ChangeType = "MODIFY" | |
// Remove is used to flag that ChangeEvent represents the deletion of a previously existing entity or link item in the DynamoDB | |
// table. Entity ChangeEvent structures with this type attribute shall contain only the old details about the entity, there | |
// being no new details to pass along. | |
Remove ChangeType = "REMOVE" | |
) | |
// UnmarshalNewImage and UnmarshalOldImage may be used on any DynamoDBEventRecord to unmarshal the image | |
// to an application specific structure. If the Transform function has has not already been invoked on | |
// the record, that will be done before the actual unmarshalling is attempted. | |
func (r *DynamoDBEventRecord) UnmarshalNewImage(out interface{}) error { | |
// Make sure we are in a fits state to attempt unmarshalling | |
err := r.Transform() | |
if err != nil { | |
return err | |
} | |
// Attempt the unmarshalling (close your eyes, don't look the Medusa in the face) | |
return munm.UnmarshalMap(r.Change.NewImage, out) | |
} | |
// UnmarshalOldImage and UnmarshalNewImage may be used on any DynamoDBEventRecord to unmarshal the image | |
// to an application specific structure. If the Transform function has not already been invoked on | |
// the record, that will be done before the actual unmarshalling is attempted. | |
func (r *DynamoDBEventRecord) UnmarshalOldImage(out interface{}) error { | |
// Make sure we are in a fits state to attempt unmarshalling | |
err := r.Transform() | |
if err != nil { | |
return err | |
} | |
// Attempt the unmarshalling (close your eyes, don't look the Medusa in the face) | |
return munm.UnmarshalMap(r.Change.OldImage, out) | |
} | |
// Transform copies and converts the new and old image values from the DynamoDB Lambda event version of the images | |
// into a form that can be easily unmarshalled into application defined structures, something that the native raw | |
// Lambda event implementation is unable to do. | |
// | |
// Calling this function more than once is harmless and has no effect; the function knows if it has already been invoked on | |
// a given DynamoDBEventRecord and will not waste time repeating the work. If you are not certain whether Transform has already | |
// been invoked, calling it again is the equivalent of testing a flag (that to do not have access to) and calling Transform if | |
// it is not set so, if in doubt, just call Transform and relax. | |
func (r *DynamoDBEventRecord) Transform() error { | |
// Have we already been invoked we have nothing more to do | |
if r.isTransformed { | |
return r.transformErr | |
} | |
// Is there an old image to transform? | |
if r.ChangeType == Modify || r.ChangeType == Remove { | |
// Transform the old image from Lambda event form to the read write API form | |
r.Change.OldImage, r.transformErr = transformAVMap(r.Change.OldImageRaw) | |
} | |
// Is there a new image to transform? | |
if r.transformErr == nil && (r.ChangeType == Modify || r.ChangeType == Insert) { | |
// Transform the old image from Lambda event form to read write API form | |
r.Change.NewImage, r.transformErr = transformAVMap(r.Change.NewImageRaw) | |
} | |
// Flag that we have attempted this transformation before and do not need to repeat it | |
r.isTransformed = true | |
// It's almost certain that all went well but someday the planets might align otherwise | |
return r.transformErr | |
} | |
// transformAV converts a single DynamoDB Lambda event attribute value to an equivalent DynamoDB read write API attribute value. | |
func transformAV(rawValue events.DynamoDBAttributeValue) (avtype.AttributeValue, error) { | |
// Build our response here | |
var result avtype.AttributeValue | |
var err error | |
// What we have to do depends on the type of the raw value input | |
switch rawValue.DataType() { | |
case events.DataTypeBinary: | |
result = &avtype.AttributeValueMemberB{ | |
Value: rawValue.Binary(), | |
} | |
case events.DataTypeBoolean: | |
result = &avtype.AttributeValueMemberBOOL{ | |
Value: rawValue.Boolean(), | |
} | |
case events.DataTypeBinarySet: | |
result = &avtype.AttributeValueMemberBS{ | |
Value: rawValue.BinarySet(), | |
} | |
case events.DataTypeList: | |
l := avtype.AttributeValueMemberL{} | |
l.Value, err = transformAVSlice(rawValue.List()) | |
result = &l | |
case events.DataTypeMap: | |
m := avtype.AttributeValueMemberM{} | |
m.Value, err = transformAVMap(rawValue.Map()) | |
result = &m | |
case events.DataTypeNumber: | |
result = &avtype.AttributeValueMemberN{ | |
Value: rawValue.Number(), | |
} | |
case events.DataTypeNumberSet: | |
result = &avtype.AttributeValueMemberNS{ | |
Value: rawValue.NumberSet(), | |
} | |
case events.DataTypeNull: | |
result = &avtype.AttributeValueMemberNULL{ | |
Value: true, | |
} | |
case events.DataTypeString: | |
result = &avtype.AttributeValueMemberS{ | |
Value: rawValue.String(), | |
} | |
case events.DataTypeStringSet: | |
result = &avtype.AttributeValueMemberSS{ | |
Value: rawValue.StringSet(), | |
} | |
default: | |
// Seriously, AWS must have added a new data type or something unexpected :-( | |
err = fmt.Errorf("encountered unrecognized event attribute value data type: %d", rawValue.DataType()) | |
} | |
// Well, that was long-winded but not so hard really | |
return result, err | |
} | |
// transformAVSlice converts a slice of DynamoDB Lambda event attribute values to an equivalent slice of | |
// DynamoDB read write API attribute values. | |
func transformAVSlice(rawAVSlice []events.DynamoDBAttributeValue) ([]avtype.AttributeValue, error) { | |
// Build our result here | |
result := make([]avtype.AttributeValue, len(rawAVSlice)) | |
// Iterate over the range of the map, converting each individual named attribute value | |
var err error | |
for index, rawValue := range rawAVSlice { | |
result[index], err = transformAV(rawValue) | |
if err != nil { | |
return result, err | |
} | |
} | |
// All done, return the profit from our efforts | |
return result, nil | |
} | |
// transformAVMap converts a map of DynamoDB Lambda event attribute values to an equivalent map of | |
// DynamoDB read write API attribute values. | |
func transformAVMap(rawAVMap map[string]events.DynamoDBAttributeValue) (map[string]avtype.AttributeValue, error) { | |
// Build our result here | |
result := make(map[string]avtype.AttributeValue) | |
// Iterate over the range of the map, converting each individual named attribute value | |
var err error | |
for key, rawValue := range rawAVMap { | |
result[key], err = transformAV(rawValue) | |
if err != nil { | |
return result, err | |
} | |
} | |
// All done, return the profit from our efforts | |
return result, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment