Skip to content

Instantly share code, notes, and snippets.

@chris
Last active January 12, 2023 02:34
Show Gist options
  • Save chris/c94014174a8a807c58080ebaec143278 to your computer and use it in GitHub Desktop.
Save chris/c94014174a8a807c58080ebaec143278 to your computer and use it in GitHub Desktop.
DynamoDB streams lambda trigger handler in Go
package main
import (
"context"
"log"
"os"
"strings"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
const (
maxRetries = 5
retryInterval = 1 * time.Second
)
// processableEvent determines if the incoming event from the DynamoDB stream
// is one we should process. This returns true if it's an unprocessed event
// record and is an INSERT - everything else should be ignored.
func processableEvent(e events.DynamoDBEventRecord) bool {
// This is a safety measure, as our filters should handle this
return e.EventName == "INSERT" && strings.HasPrefix(e.Change.Keys[repos.PK].String(), repos.UnprocessedEventPrefix+"#")
}
func Handler(e events.DynamoDBEvent) error {
// process events linearly, as order matters, do not parallelize
var events []models.MyEvent
for _, v := range e.Records {
if !processableEvent(v) {
continue
}
for i := 1; i <= maxRetries; i++ {
// We cheat here and just fetch the item by its keys from DDB, instead of
// having to do all the unmarshaling (where we'd have to convert attribute
// types and then use the unmarshaler in our DDB Go library, etc.)
// TODO: as we scale up, we shouldn't do this as we're paying for an extra
// read that we don't need (since all the data is here).
le, err := getEvent(v.Change.Keys)
if err != nil {
// We do see this occasionally where this Lambda is unable to load the
// stream record. Yet, a later manual query finds the record. Seems it may
// be the read inconsistency of Dynamo? Log this, then return the error so
// that it will be retried (and presumably then find the records).
// Note, the entire stream will be retried, and because we accumulate all
// the records before processing, this is idempotent. See:
// https://docs.aws.amazon.com/lambda/latest/dg/invocation-retries.html
if i == maxRetries {
log.Printf("ERROR: Unable to load Event (after %d retries) for %v: %v (skipping this event)\n", maxRetries, v.Change.Keys, err)
} else {
time.Sleep(retryInterval)
}
} else {
events = append(events, *le)
break
}
}
}
// do something with the `events`
return nil
}
func main() {
lambda.Start(Handler)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment