Skip to content

Instantly share code, notes, and snippets.

@Hunrik
Created June 15, 2023 14:17
Show Gist options
  • Save Hunrik/64e77075e3eb4c128db771196021d725 to your computer and use it in GitHub Desktop.
Save Hunrik/64e77075e3eb4c128db771196021d725 to your computer and use it in GitHub Desktop.
SQS Consumer
package sqs
import (
"context"
"github.com/cloudevents/sdk-go/pkg/cloudevents"
)
// EventRouter can be used to route events to subscribe handlers to a specific event type
type EventRouter struct {
eventHandlers map[string]ConsumerFn
defaultHandler ConsumerFn
}
func NewEventRouter(defaultHandler ConsumerFn) *EventRouter {
return &EventRouter{
eventHandlers: make(map[string]ConsumerFn),
defaultHandler: defaultHandler,
}
}
func (r *EventRouter) RegisterHandler(eventType string, fn ConsumerFn) {
r.eventHandlers[eventType] = fn
}
func (r *EventRouter) HandleEvent(ctx context.Context, event *cloudevents.Event) bool {
fn, ok := r.eventHandlers[event.Type()]
if !ok {
if r.defaultHandler != nil {
return r.defaultHandler(ctx, event)
}
return true // Message can be deleted as there is no handler
}
return fn(ctx, event)
}
package sqs
import (
"context"
"fmt"
"sync"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/cloudevents/sdk-go/pkg/cloudevents"
)
type Publishable interface {
GetTopic() string
GetPayload() ([]byte, error)
GetEvent() cloudevents.Event
}
type Publisher struct {
sqs *sqs.Client
urlCache map[string]string
sync.RWMutex
}
func NewPublisher(cfg aws.Config) *Publisher {
return &Publisher{
sqs: sqs.NewFromConfig(cfg),
urlCache: make(map[string]string),
}
}
func (p *Publisher) Publish(ctx context.Context, event Publishable) error {
payload, err := event.GetPayload()
if err != nil {
return err
}
// @todo: cache the queue url
queueURL := p.resolveQueueUrl(event.GetTopic())
msg := &sqs.SendMessageInput{
QueueUrl: &queueURL,
MessageBody: aws.String(string(payload)),
}
_, err = p.sqs.SendMessage(ctx, msg)
if err != nil {
return err
}
return nil
}
func (p *Publisher) resolveQueueUrl(topic string) string {
tpl := "https://sqs.%s.amazonaws.com/%s/%s"
return fmt.Sprintf(tpl, "", "", topic)
}
package sqs
import (
"context"
"fmt"
"sync"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sts"
)
const queueTemplate = "https://sqs.%s.amazonaws.com/%s/%s"
var AWSAccountID, AWSRegion string
var once sync.Once
func ResolveQueueUrl(cfg aws.Config, topic string) (string, error) {
once.Do(func() {
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
panic(err)
}
identity, err := sts.NewFromConfig(cfg).
GetCallerIdentity(context.TODO(), &sts.GetCallerIdentityInput{})
if err != nil {
panic(err)
}
AWSRegion = cfg.Region
AWSAccountID = *identity.Account
})
return fmt.Sprintf(queueTemplate, AWSRegion, AWSAccountID, topic), nil
}
package sqs
import (
"context"
"encoding/json"
"sync"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/cloudevents/sdk-go/pkg/cloudevents"
"go.uber.org/zap"
)
type ConsumerFn func(ctx context.Context, msg *cloudevents.Event) bool
type Consumer struct {
sqs *sqs.Client
queueURL string
}
func NewConsumer(cfg aws.Config, queueURL string) (*Consumer, error) {
client := sqs.NewFromConfig(cfg)
return &Consumer{sqs: client, queueURL: queueURL}, nil
}
// @TODO: Change it to use the prefetcher pattern so the slowest message won't block
func (c *Consumer) Run(ctx context.Context, fn ConsumerFn) error {
params := &sqs.ReceiveMessageInput{
QueueUrl: &c.queueURL,
MaxNumberOfMessages: 10,
// VisibilityTimeout: 30,
WaitTimeSeconds: 5,
}
for {
if err := ctx.Err(); err != nil {
return err
}
resp, err := c.sqs.ReceiveMessage(ctx, params)
if err != nil {
return err
}
for _, msg := range resp.Messages {
var wg sync.WaitGroup
wg.Add(1)
go func(msg types.Message) {
defer wg.Done()
ctx := context.Background()
event := cloudevents.New()
err := json.Unmarshal([]byte(*msg.Body), &event)
if err != nil {
_, err := c.sqs.DeleteMessage(ctx, &sqs.DeleteMessageInput{
ReceiptHandle: msg.ReceiptHandle,
})
if err != nil {
zap.L().Error("Failed to delete messages", zap.Error(err))
// @TODO: Better error handling
}
}
canDelete := fn(ctx, &event)
if !canDelete {
return
}
_, err = c.sqs.DeleteMessage(ctx, &sqs.DeleteMessageInput{
ReceiptHandle: msg.ReceiptHandle,
})
if err != nil {
zap.L().Error("Failed to delete messages", zap.Error(err))
// @TODO: Better error handling
}
}(msg)
wg.Wait()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment