Created
May 19, 2025 11:07
-
-
Save pmgtivo/5e05afe96a9451e0b4d4857e60a33fae to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"time" | |
"github.com/aws/aws-lambda-go/lambda" | |
"github.com/aws/aws-sdk-go-v2/aws" | |
"github.com/aws/aws-sdk-go-v2/config" | |
"github.com/aws/aws-sdk-go-v2/service/dynamodb" | |
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types" | |
) | |
const ( | |
TableName = "Linpub_flattener_test" | |
GSIName = "GSI1" | |
TargetGSI = "EMAIL-1.3m@example.com" | |
TargetTable = "Linpub_flattener_test_target" | |
) | |
type StatsResponse struct { | |
TotalItems int `json:"total_items"` | |
TotalLatencyMS int64 `json:"total_latency_ms"` | |
AvgLatencyMS float64 `json:"avg_latency_per_item_ms"` | |
} | |
type SimpleItem struct { | |
PK string `json:"pk"` | |
Name string `json:"name"` | |
} | |
func HandleRequest(ctx context.Context) (StatsResponse, error) { | |
cfg, err := config.LoadDefaultConfig(ctx) | |
if err != nil { | |
return StatsResponse{}, fmt.Errorf("failed to load AWS config: %w", err) | |
} | |
svc := dynamodb.NewFromConfig(cfg) | |
dbTableErr := ensureTargetTableExists(ctx, svc, TargetTable) | |
if dbTableErr != nil { | |
return StatsResponse{}, dbTableErr | |
} | |
var totalItems int | |
var lastKey map[string]types.AttributeValue | |
start := time.Now() | |
for { | |
output, err := svc.Query(ctx, &dynamodb.QueryInput{ | |
TableName: aws.String(TableName), | |
IndexName: aws.String(GSIName), | |
KeyConditionExpression: aws.String("GSI_PK = :pk"), | |
ExpressionAttributeValues: map[string]types.AttributeValue{ | |
":pk": &types.AttributeValueMemberS{Value: TargetGSI}, | |
}, | |
ExclusiveStartKey: lastKey, | |
Limit: aws.Int32(1000), | |
}) | |
// time.Sleep(2 * time.Millisecond) | |
if err != nil { | |
return StatsResponse{}, fmt.Errorf("query error: %w", err) | |
} | |
var writeRequests []types.WriteRequest | |
const batchSize = 25 | |
for _, item := range output.Items { | |
pk := "" | |
name := "" | |
if v, ok := item["PK"].(*types.AttributeValueMemberS); ok { | |
pk = v.Value | |
} | |
if v, ok := item["Name"].(*types.AttributeValueMemberS); ok { | |
name = v.Value | |
} | |
writeRequests = append(writeRequests, types.WriteRequest{ | |
PutRequest: &types.PutRequest{ | |
Item: map[string]types.AttributeValue{ | |
"PK": &types.AttributeValueMemberS{Value: pk}, | |
"Name": &types.AttributeValueMemberS{Value: name}, | |
}, | |
}, | |
}) | |
// If batch is full, write it | |
if len(writeRequests) == batchSize { | |
_, err := svc.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ | |
RequestItems: map[string][]types.WriteRequest{ | |
TargetTable: writeRequests, | |
}, | |
}) | |
if err != nil { | |
fmt.Printf("failed to batch write items: %v\n", err) | |
} | |
writeRequests = writeRequests[:0] // reset batch | |
} | |
} | |
// Write any remaining items | |
if len(writeRequests) > 0 { | |
_, err := svc.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ | |
RequestItems: map[string][]types.WriteRequest{ | |
TargetTable: writeRequests, | |
}, | |
}) | |
if err != nil { | |
fmt.Printf("failed to batch write items: %v\n", err) | |
} | |
} | |
totalItems += int(output.Count) | |
if output.LastEvaluatedKey == nil { | |
break | |
} | |
lastKey = output.LastEvaluatedKey | |
} | |
duration := time.Since(start) | |
avgLatency := 0.0 | |
if totalItems > 0 { | |
avgLatency = float64(duration.Milliseconds()) / float64(totalItems) | |
} | |
return StatsResponse{ | |
TotalItems: totalItems, | |
TotalLatencyMS: duration.Milliseconds(), | |
AvgLatencyMS: avgLatency, | |
}, nil | |
} | |
func ensureTargetTableExists(ctx context.Context, svc *dynamodb.Client, tableName string) error { | |
// Check if table exists | |
_, err := svc.DescribeTable(ctx, &dynamodb.DescribeTableInput{ | |
TableName: aws.String(tableName), | |
}) | |
if err == nil { | |
fmt.Println("Target table already exists.") | |
return nil | |
} | |
var notFound *types.ResourceNotFoundException | |
if errors.As(err, ¬Found) { | |
fmt.Println("Target table does not exist. Creating...") | |
_, err = svc.CreateTable(ctx, &dynamodb.CreateTableInput{ | |
TableName: aws.String(tableName), | |
AttributeDefinitions: []types.AttributeDefinition{ | |
{ | |
AttributeName: aws.String("PK"), | |
AttributeType: types.ScalarAttributeTypeS, | |
}, | |
}, | |
KeySchema: []types.KeySchemaElement{ | |
{ | |
AttributeName: aws.String("PK"), | |
KeyType: types.KeyTypeHash, | |
}, | |
}, | |
BillingMode: types.BillingModePayPerRequest, // On-demand | |
Tags: []types.Tag{ | |
{ | |
Key: aws.String("TivoService"), | |
Value: aws.String("Linpub"), | |
}, | |
{ | |
Key: aws.String("TivoEnv"), | |
Value: aws.String("bclab1"), | |
}, | |
{ | |
Key: aws.String("TivoOwner"), | |
Value: aws.String("linpub-dev@xperi.com"), | |
}, | |
{ | |
Key: aws.String("TivoTTL"), | |
Value: aws.String("AlwaysOn"), | |
}, | |
}, | |
}) | |
if err != nil { | |
return fmt.Errorf("failed to create target table: %w", err) | |
} | |
fmt.Println("Target table creation initiated.") | |
return nil | |
} | |
return fmt.Errorf("error describing table: %w", err) | |
} | |
func main() { | |
lambda.Start(HandleRequest) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment