Last active
January 12, 2023 02:34
-
-
Save chris/c94014174a8a807c58080ebaec143278 to your computer and use it in GitHub Desktop.
DynamoDB streams lambda trigger handler in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"log" | |
"os" | |
"strings" | |
"github.com/aws/aws-lambda-go/events" | |
"github.com/aws/aws-lambda-go/lambda" | |
) | |
const ( | |
maxRetries = 5 | |
retryInterval = 1 * time.Second | |
) | |
// processableEvent determines if the incoming event from the DynamoDB stream | |
// is one we should process. This returns true if it's an unprocessed event | |
// record and is an INSERT - everything else should be ignored. | |
func processableEvent(e events.DynamoDBEventRecord) bool { | |
// This is a safety measure, as our filters should handle this | |
return e.EventName == "INSERT" && strings.HasPrefix(e.Change.Keys[repos.PK].String(), repos.UnprocessedEventPrefix+"#") | |
} | |
func Handler(e events.DynamoDBEvent) error { | |
// process events linearly, as order matters, do not parallelize | |
var events []models.MyEvent | |
for _, v := range e.Records { | |
if !processableEvent(v) { | |
continue | |
} | |
for i := 1; i <= maxRetries; i++ { | |
// We cheat here and just fetch the item by its keys from DDB, instead of | |
// having to do all the unmarshaling (where we'd have to convert attribute | |
// types and then use the unmarshaler in our DDB Go library, etc.) | |
// TODO: as we scale up, we shouldn't do this as we're paying for an extra | |
// read that we don't need (since all the data is here). | |
le, err := getEvent(v.Change.Keys) | |
if err != nil { | |
// We do see this occasionally where this Lambda is unable to load the | |
// stream record. Yet, a later manual query finds the record. Seems it may | |
// be the read inconsistency of Dynamo? Log this, then return the error so | |
// that it will be retried (and presumably then find the records). | |
// Note, the entire stream will be retried, and because we accumulate all | |
// the records before processing, this is idempotent. See: | |
// https://docs.aws.amazon.com/lambda/latest/dg/invocation-retries.html | |
if i == maxRetries { | |
log.Printf("ERROR: Unable to load Event (after %d retries) for %v: %v (skipping this event)\n", maxRetries, v.Change.Keys, err) | |
} else { | |
time.Sleep(retryInterval) | |
} | |
} else { | |
events = append(events, *le) | |
break | |
} | |
} | |
} | |
// do something with the `events` | |
return nil | |
} | |
func main() { | |
lambda.Start(Handler) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment