Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save pmgtivo/5e05afe96a9451e0b4d4857e60a33fae to your computer and use it in GitHub Desktop.
Save pmgtivo/5e05afe96a9451e0b4d4857e60a33fae to your computer and use it in GitHub Desktop.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
const (
TableName = "Linpub_flattener_test"
GSIName = "GSI1"
TargetGSI = "EMAIL-1.3m@example.com"
TargetTable = "Linpub_flattener_test_target"
)
type StatsResponse struct {
TotalItems int `json:"total_items"`
TotalLatencyMS int64 `json:"total_latency_ms"`
AvgLatencyMS float64 `json:"avg_latency_per_item_ms"`
}
type SimpleItem struct {
PK string `json:"pk"`
Name string `json:"name"`
}
func HandleRequest(ctx context.Context) (StatsResponse, error) {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return StatsResponse{}, fmt.Errorf("failed to load AWS config: %w", err)
}
svc := dynamodb.NewFromConfig(cfg)
dbTableErr := ensureTargetTableExists(ctx, svc, TargetTable)
if dbTableErr != nil {
return StatsResponse{}, dbTableErr
}
var totalItems int
var lastKey map[string]types.AttributeValue
start := time.Now()
for {
output, err := svc.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(TableName),
IndexName: aws.String(GSIName),
KeyConditionExpression: aws.String("GSI_PK = :pk"),
ExpressionAttributeValues: map[string]types.AttributeValue{
":pk": &types.AttributeValueMemberS{Value: TargetGSI},
},
ExclusiveStartKey: lastKey,
Limit: aws.Int32(1000),
})
// time.Sleep(2 * time.Millisecond)
if err != nil {
return StatsResponse{}, fmt.Errorf("query error: %w", err)
}
var writeRequests []types.WriteRequest
const batchSize = 25
for _, item := range output.Items {
pk := ""
name := ""
if v, ok := item["PK"].(*types.AttributeValueMemberS); ok {
pk = v.Value
}
if v, ok := item["Name"].(*types.AttributeValueMemberS); ok {
name = v.Value
}
writeRequests = append(writeRequests, types.WriteRequest{
PutRequest: &types.PutRequest{
Item: map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{Value: pk},
"Name": &types.AttributeValueMemberS{Value: name},
},
},
})
// If batch is full, write it
if len(writeRequests) == batchSize {
_, err := svc.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{
TargetTable: writeRequests,
},
})
if err != nil {
fmt.Printf("failed to batch write items: %v\n", err)
}
writeRequests = writeRequests[:0] // reset batch
}
}
// Write any remaining items
if len(writeRequests) > 0 {
_, err := svc.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{
TargetTable: writeRequests,
},
})
if err != nil {
fmt.Printf("failed to batch write items: %v\n", err)
}
}
totalItems += int(output.Count)
if output.LastEvaluatedKey == nil {
break
}
lastKey = output.LastEvaluatedKey
}
duration := time.Since(start)
avgLatency := 0.0
if totalItems > 0 {
avgLatency = float64(duration.Milliseconds()) / float64(totalItems)
}
return StatsResponse{
TotalItems: totalItems,
TotalLatencyMS: duration.Milliseconds(),
AvgLatencyMS: avgLatency,
}, nil
}
func ensureTargetTableExists(ctx context.Context, svc *dynamodb.Client, tableName string) error {
// Check if table exists
_, err := svc.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err == nil {
fmt.Println("Target table already exists.")
return nil
}
var notFound *types.ResourceNotFoundException
if errors.As(err, &notFound) {
fmt.Println("Target table does not exist. Creating...")
_, err = svc.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: aws.String(tableName),
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("PK"),
AttributeType: types.ScalarAttributeTypeS,
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("PK"),
KeyType: types.KeyTypeHash,
},
},
BillingMode: types.BillingModePayPerRequest, // On-demand
Tags: []types.Tag{
{
Key: aws.String("TivoService"),
Value: aws.String("Linpub"),
},
{
Key: aws.String("TivoEnv"),
Value: aws.String("bclab1"),
},
{
Key: aws.String("TivoOwner"),
Value: aws.String("linpub-dev@xperi.com"),
},
{
Key: aws.String("TivoTTL"),
Value: aws.String("AlwaysOn"),
},
},
})
if err != nil {
return fmt.Errorf("failed to create target table: %w", err)
}
fmt.Println("Target table creation initiated.")
return nil
}
return fmt.Errorf("error describing table: %w", err)
}
func main() {
lambda.Start(HandleRequest)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment