Skip to content

Instantly share code, notes, and snippets.

@christopher-wong
Created August 13, 2021 20:13
Show Gist options
  • Save christopher-wong/86690769e1d32c57ec050024ddc12e74 to your computer and use it in GitHub Desktop.
Save christopher-wong/86690769e1d32c57ec050024ddc12e74 to your computer and use it in GitHub Desktop.
Dynamo stream processor that handles polymophmic type conversion to JSON
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func main() {
lambda.Start(handleRequestV2)
}
func handleRequestV2(ctx context.Context, e events.DynamoDBEvent) error {
for _, record := range e.Records {
fmt.Printf("Processing request data for event ID %s, type %s.\n", record.EventID, record.EventName)
// decide between NewImage and OldImage
var recordToWrite map[string]events.DynamoDBAttributeValue
if record.EventName == "REMOVE" {
recordToWrite = record.Change.OldImage
} else {
recordToWrite = record.Change.NewImage
}
recordAsMap, err := UnmarshalStreamImageV2(recordToWrite)
if err != nil {
fmt.Printf("ERROR: failed to unmarshal stream image: %w", err)
return err
}
recordBytes, err := json.Marshal(recordAsMap)
if err != nil {
fmt.Printf("ERROR: failed to marshal record as JSON: %w", err)
return err
}
fmt.Println(string(recordBytes))
}
return nil
}
func GetAttributeValue(record events.DynamoDBAttributeValue) (interface{}, error) {
switch record.DataType() {
case events.DataTypeBinary:
return record.Binary(), nil
case events.DataTypeBoolean:
return record.Boolean(), nil
case events.DataTypeBinarySet:
return record.BinarySet(), nil
case events.DataTypeNumber:
return record.Number(), nil
case events.DataTypeNumberSet:
return record.NumberSet(), nil
case events.DataTypeNull:
return record.IsNull(), nil
case events.DataTypeStringSet:
return record.StringSet(), nil
case events.DataTypeString:
return record.String(), nil
case events.DataTypeList:
items := make([]interface{}, len(record.List()))
for i, item := range record.List() {
valueItem, err := GetAttributeValue(item)
if err != nil {
return nil, err
}
items[i] = valueItem
}
return items, nil
case events.DataTypeMap:
recordMap := make(map[string]interface{})
for name, value := range record.Map() {
valueItem, err := GetAttributeValue(value)
if err != nil {
return nil, err
}
recordMap[name] = valueItem
}
return recordMap, nil
default:
return nil, fmt.Errorf("unexpected attributevalue type found: %+v", record.DataType())
}
}
func UnmarshalStreamImageV2(attribute map[string]events.DynamoDBAttributeValue) (map[string]interface{}, error) {
recordMap := map[string]interface{}{}
for name, record := range attribute {
item, err := GetAttributeValue(record)
if err != nil {
return nil, err
}
recordMap[name] = item
}
return recordMap, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment