Skip to content

Instantly share code, notes, and snippets.

@SodaDev
Last active April 30, 2024 06:45
Show Gist options
  • Save SodaDev/8da27a2599a3d2ba51f641003657e821 to your computer and use it in GitHub Desktop.
Save SodaDev/8da27a2599a3d2ba51f641003657e821 to your computer and use it in GitHub Desktop.
package handler
import (
"context"
"fmt"
"github.com/Ryanair/gofrlib/log"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/lambda"
"github.com/aws/aws-sdk-go-v2/service/lambda/types"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"os"
"otel-tracing/CbEventSourceManager/cmd/schema/aws/cloudwatch/cloudwatchalarmstatechange"
"strconv"
)
type LambdaHandler struct {
loggerConfig log.Configuration
lambdaClient *lambda.Client
}
func New(loggerConfig log.Configuration, client *lambda.Client) *LambdaHandler {
return &LambdaHandler{
loggerConfig: loggerConfig,
lambdaClient: client,
}
}
func (lh *LambdaHandler) Handle(ctx context.Context, event cloudwatchalarmstatechange.AWSEvent) error {
log.Init(lh.loggerConfig)
log.Debug("Got event: %s", log.ToString(event))
mappings, err := lh.getEventSourceMappings(ctx)
if err != nil {
log.Error("Error getting event source mappings: %s", err)
return err
}
if event.Detail.State.Value == "ALARM" {
err := lh.openCircuit(ctx, event, &mappings.EventSourceMappings[0])
if err != nil {
return errors.Wrapf(err, "Error opening circuit")
}
return nil
} else if event.Detail.State.Value == "OK" {
err := lh.closeCircuit(ctx, event, &mappings.EventSourceMappings[0])
if err != nil {
return errors.Wrapf(err, "Error closing circuit")
}
return nil
} else if event.Detail.State.Value == "INSUFFICIENT_DATA" && event.Detail.PreviousState.Value == "ALARM" {
err := lh.halfOpen(ctx, event, &mappings.EventSourceMappings[0])
if err != nil {
return errors.Wrapf(err, "Error half-opening circuit")
}
return nil
}
return nil
}
func (lh *LambdaHandler) getEventSourceMappings(ctx context.Context) (*lambda.ListEventSourceMappingsOutput, error) {
mappings, err := lh.lambdaClient.ListEventSourceMappings(ctx, &lambda.ListEventSourceMappingsInput{
EventSourceArn: aws.String(os.Getenv("QUEUE_ARN")),
FunctionName: aws.String(os.Getenv("FUNCTION_ARN")),
})
if err != nil {
return nil, errors.Wrapf(err, "Error listing event source mappings")
}
if len(mappings.EventSourceMappings) == 0 {
return nil, errors.New("No event source mappings found")
}
if len(mappings.EventSourceMappings) > 1 {
return nil, errors.New(fmt.Sprintf("Multiple event source mappings found: %s", log.ToString(mappings)))
}
return mappings, nil
}
func (lh *LambdaHandler) openCircuit(ctx context.Context, event cloudwatchalarmstatechange.AWSEvent, mapping *types.EventSourceMappingConfiguration) error {
mappingOutput, err := lh.lambdaClient.UpdateEventSourceMapping(ctx, &lambda.UpdateEventSourceMappingInput{
UUID: mapping.UUID,
Enabled: aws.Bool(false),
})
if err != nil {
return errors.Wrapf(err, "Error updating event source mapping")
}
instrumentCircuitStatusUpdate(ctx, "OPEN", event)
log.Debug("Updated mapping: %s", log.ToString(mappingOutput))
return nil
}
func (lh *LambdaHandler) closeCircuit(ctx context.Context, event cloudwatchalarmstatechange.AWSEvent, mapping *types.EventSourceMappingConfiguration) error {
closedConcurrencyString := os.Getenv("CLOSED_CONCURRENCY")
closedConcurrency, err := strconv.Atoi(closedConcurrencyString)
if err != nil {
return errors.Wrapf(err, "Error converting closed concurrency to int")
}
mappingOutput, err := lh.lambdaClient.UpdateEventSourceMapping(ctx, &lambda.UpdateEventSourceMappingInput{
UUID: mapping.UUID,
Enabled: aws.Bool(true),
ScalingConfig: &types.ScalingConfig{
MaximumConcurrency: aws.Int32(int32(closedConcurrency)),
},
})
if err != nil {
return errors.Wrapf(err, "Error updating event source mapping")
}
instrumentCircuitStatusUpdate(ctx, "CLOSE", event)
log.Debug("Updated mapping: %s", log.ToString(mappingOutput))
return nil
}
func (lh *LambdaHandler) halfOpen(ctx context.Context, event cloudwatchalarmstatechange.AWSEvent, mapping *types.EventSourceMappingConfiguration) error {
mappingOutput, err := lh.lambdaClient.UpdateEventSourceMapping(ctx, &lambda.UpdateEventSourceMappingInput{
UUID: mapping.UUID,
Enabled: aws.Bool(true),
ScalingConfig: &types.ScalingConfig{
MaximumConcurrency: aws.Int32(2),
},
})
if err != nil {
return errors.Wrapf(err, "Error updating event source mapping")
}
instrumentCircuitStatusUpdate(ctx, "HALF_OPEN", event)
log.Debug("Updated mapping: %s", log.ToString(mappingOutput))
return nil
}
func instrumentCircuitStatusUpdate(ctx context.Context, circuitState string, event cloudwatchalarmstatechange.AWSEvent) {
trace.SpanFromContext(ctx).AddEvent("circuitStateUpdate",
trace.WithAttributes(attribute.String("state", circuitState)),
trace.WithAttributes(attribute.String("alarmEvent", log.ToString(event.Detail))),
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment