Skip to content

Instantly share code, notes, and snippets.

@acastro2
Last active July 13, 2024 03:27
Show Gist options
  • Save acastro2/8ad546ccff0c3e82aa5b5e867c086c80 to your computer and use it in GitHub Desktop.
Save acastro2/8ad546ccff0c3e82aa5b5e867c086c80 to your computer and use it in GitHub Desktop.
Kafka retries with Go
package main
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"math/rand"
"time"
"github.com/acastro2/go-retries/kafka_retry_dlq"
"github.com/cenkalti/backoff/v4"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)
type exampleHandler struct {
}
func (h *exampleHandler) Process(ctx context.Context, msg kafka.Message) error {
// Parsing kafka message as json and log it to console
var jsonData map[string]interface{}
err := json.Unmarshal(msg.Value, &jsonData)
fmt.Println(jsonData)
// Return error if message processing failed
if rand.Intn(100) < 25 {
return fmt.Errorf("Throw random error to simulate processing failure and test retry.")
}
return err
}
func (h *exampleHandler) MoveToDLQ(ctx context.Context, msg kafka.Message) {
// Implement logic to move message to DLQ
fmt.Println("Moving message to DLQ")
}
func main() {
ctx := context.Background()
readerConfig := kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: "<consumer-group-id>",
Topic: "<topic>",
MaxBytes: 10e6, // 10MB
WatchPartitionChanges: true,
Dialer: &kafka.Dialer{
SASLMechanism: plain.Mechanism{
Username: "<username>",
Password: "<password>",
},
TLS: &tls.Config{
MinVersion: tls.VersionTLS12,
},
},
}
reader := kafka.NewReader(readerConfig)
defer reader.Close()
backoff := backoff.NewExponentialBackOff()
backoff.MaxElapsedTime = time.Minute * 5
options := kafka_retry_dlq.ConsumerWithRetryOptions{
Reader: reader,
Backoff: backoff,
MaxRetries: 3,
Handler: &exampleHandler{},
RetryQueue: make(chan kafka.Message, 1000),
}
kafka_retry_dlq.NewConsumerWithRetry(ctx, &options)
// Wait for messages to be processed
select {}
}
package kafka_retry_dlq
import (
"context"
"fmt"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/segmentio/kafka-go"
)
type ProcessRetryHandler interface {
Process(context.Context, kafka.Message) error
MoveToDLQ(context.Context, kafka.Message)
}
type ConsumerWithRetryOptions struct {
Handler ProcessRetryHandler
Reader *kafka.Reader
MaxRetries int
RetryQueue chan kafka.Message
Backoff backoff.BackOff
}
// NewConsumerWithRetry creates a new kafka consumer with retry mechanism
func NewConsumerWithRetry(ctx context.Context, options *ConsumerWithRetryOptions) {
go func() {
for {
select {
case msg := <-options.RetryQueue:
retries := 1
for {
fmt.Printf("Retry %v message %v\n", retries, msg.Key)
if retries >= options.MaxRetries {
options.Handler.MoveToDLQ(ctx, msg)
break
}
if err := options.Handler.Process(ctx, msg); err != nil {
fmt.Printf("Error processing message, retrying: %v\n", err)
time.Sleep(options.Backoff.NextBackOff())
retries++
continue
}
break
}
}
}
}()
for {
msg, err := options.Reader.ReadMessage(ctx)
if err != nil {
fmt.Println("Error reading message: ", err)
return
}
if err := options.Handler.Process(ctx, msg); err != nil {
options.RetryQueue <- msg
}
}
}
@HosseyNJF
Copy link

Nice example!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment