Created
June 15, 2023 14:17
-
-
Save Hunrik/64e77075e3eb4c128db771196021d725 to your computer and use it in GitHub Desktop.
SQS Consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sqs | |
import ( | |
"context" | |
"github.com/cloudevents/sdk-go/pkg/cloudevents" | |
) | |
// EventRouter can be used to route events to subscribe handlers to a specific event type | |
type EventRouter struct { | |
eventHandlers map[string]ConsumerFn | |
defaultHandler ConsumerFn | |
} | |
func NewEventRouter(defaultHandler ConsumerFn) *EventRouter { | |
return &EventRouter{ | |
eventHandlers: make(map[string]ConsumerFn), | |
defaultHandler: defaultHandler, | |
} | |
} | |
func (r *EventRouter) RegisterHandler(eventType string, fn ConsumerFn) { | |
r.eventHandlers[eventType] = fn | |
} | |
func (r *EventRouter) HandleEvent(ctx context.Context, event *cloudevents.Event) bool { | |
fn, ok := r.eventHandlers[event.Type()] | |
if !ok { | |
if r.defaultHandler != nil { | |
return r.defaultHandler(ctx, event) | |
} | |
return true // Message can be deleted as there is no handler | |
} | |
return fn(ctx, event) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sqs | |
import ( | |
"context" | |
"fmt" | |
"sync" | |
"github.com/aws/aws-sdk-go-v2/aws" | |
"github.com/aws/aws-sdk-go-v2/service/sqs" | |
"github.com/cloudevents/sdk-go/pkg/cloudevents" | |
) | |
type Publishable interface { | |
GetTopic() string | |
GetPayload() ([]byte, error) | |
GetEvent() cloudevents.Event | |
} | |
type Publisher struct { | |
sqs *sqs.Client | |
urlCache map[string]string | |
sync.RWMutex | |
} | |
func NewPublisher(cfg aws.Config) *Publisher { | |
return &Publisher{ | |
sqs: sqs.NewFromConfig(cfg), | |
urlCache: make(map[string]string), | |
} | |
} | |
func (p *Publisher) Publish(ctx context.Context, event Publishable) error { | |
payload, err := event.GetPayload() | |
if err != nil { | |
return err | |
} | |
// @todo: cache the queue url | |
queueURL := p.resolveQueueUrl(event.GetTopic()) | |
msg := &sqs.SendMessageInput{ | |
QueueUrl: &queueURL, | |
MessageBody: aws.String(string(payload)), | |
} | |
_, err = p.sqs.SendMessage(ctx, msg) | |
if err != nil { | |
return err | |
} | |
return nil | |
} | |
func (p *Publisher) resolveQueueUrl(topic string) string { | |
tpl := "https://sqs.%s.amazonaws.com/%s/%s" | |
return fmt.Sprintf(tpl, "", "", topic) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sqs | |
import ( | |
"context" | |
"fmt" | |
"sync" | |
"github.com/aws/aws-sdk-go-v2/aws" | |
"github.com/aws/aws-sdk-go-v2/config" | |
"github.com/aws/aws-sdk-go-v2/service/sts" | |
) | |
const queueTemplate = "https://sqs.%s.amazonaws.com/%s/%s" | |
var AWSAccountID, AWSRegion string | |
var once sync.Once | |
func ResolveQueueUrl(cfg aws.Config, topic string) (string, error) { | |
once.Do(func() { | |
cfg, err := config.LoadDefaultConfig(context.TODO()) | |
if err != nil { | |
panic(err) | |
} | |
identity, err := sts.NewFromConfig(cfg). | |
GetCallerIdentity(context.TODO(), &sts.GetCallerIdentityInput{}) | |
if err != nil { | |
panic(err) | |
} | |
AWSRegion = cfg.Region | |
AWSAccountID = *identity.Account | |
}) | |
return fmt.Sprintf(queueTemplate, AWSRegion, AWSAccountID, topic), nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sqs | |
import ( | |
"context" | |
"encoding/json" | |
"sync" | |
"github.com/aws/aws-sdk-go-v2/aws" | |
"github.com/aws/aws-sdk-go-v2/service/sqs" | |
"github.com/aws/aws-sdk-go-v2/service/sqs/types" | |
"github.com/cloudevents/sdk-go/pkg/cloudevents" | |
"go.uber.org/zap" | |
) | |
type ConsumerFn func(ctx context.Context, msg *cloudevents.Event) bool | |
type Consumer struct { | |
sqs *sqs.Client | |
queueURL string | |
} | |
func NewConsumer(cfg aws.Config, queueURL string) (*Consumer, error) { | |
client := sqs.NewFromConfig(cfg) | |
return &Consumer{sqs: client, queueURL: queueURL}, nil | |
} | |
// @TODO: Change it to use the prefetcher pattern so the slowest message won't block | |
func (c *Consumer) Run(ctx context.Context, fn ConsumerFn) error { | |
params := &sqs.ReceiveMessageInput{ | |
QueueUrl: &c.queueURL, | |
MaxNumberOfMessages: 10, | |
// VisibilityTimeout: 30, | |
WaitTimeSeconds: 5, | |
} | |
for { | |
if err := ctx.Err(); err != nil { | |
return err | |
} | |
resp, err := c.sqs.ReceiveMessage(ctx, params) | |
if err != nil { | |
return err | |
} | |
for _, msg := range resp.Messages { | |
var wg sync.WaitGroup | |
wg.Add(1) | |
go func(msg types.Message) { | |
defer wg.Done() | |
ctx := context.Background() | |
event := cloudevents.New() | |
err := json.Unmarshal([]byte(*msg.Body), &event) | |
if err != nil { | |
_, err := c.sqs.DeleteMessage(ctx, &sqs.DeleteMessageInput{ | |
ReceiptHandle: msg.ReceiptHandle, | |
}) | |
if err != nil { | |
zap.L().Error("Failed to delete messages", zap.Error(err)) | |
// @TODO: Better error handling | |
} | |
} | |
canDelete := fn(ctx, &event) | |
if !canDelete { | |
return | |
} | |
_, err = c.sqs.DeleteMessage(ctx, &sqs.DeleteMessageInput{ | |
ReceiptHandle: msg.ReceiptHandle, | |
}) | |
if err != nil { | |
zap.L().Error("Failed to delete messages", zap.Error(err)) | |
// @TODO: Better error handling | |
} | |
}(msg) | |
wg.Wait() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment