Skip to content

Instantly share code, notes, and snippets.

@dnwe
Created February 19, 2020 16:23
Show Gist options
  • Save dnwe/b75241f64e9450be269798c38740adf1 to your computer and use it in GitHub Desktop.
Save dnwe/b75241f64e9450be269798c38740adf1 to your computer and use it in GitHub Desktop.
Sarama example to subscribe to a topic and immediately commit the current end offset for the assigned partitions.
package main
import (
"context"
"fmt"
"log"
"os"
"sync"
"github.com/Shopify/sarama"
)
type Consumer struct {
client sarama.Client
ready chan bool
}
func main() {
apikey, ok := os.LookupEnv("APIKEY")
if !ok {
log.Fatal("APIKEY not set in env")
}
if len(os.Args) != 3 {
fmt.Printf("usage: %s <broker> <topic>\n", os.Args[0])
os.Exit(1)
}
broker := os.Args[1]
topic := os.Args[2]
cfg := sarama.NewConfig()
{
cfg.Version = sarama.MaxVersion
cfg.ClientID, _ = os.Hostname()
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = "token"
cfg.Net.SASL.Password = apikey
cfg.Net.TLS.Enable = true
cfg.Producer.Return.Successes = true
cfg.Consumer.Offsets.Initial = sarama.OffsetNewest
if os.Getenv("DEBUG") != "" {
sarama.Logger = log.New(os.Stdout, "[DEBUG] ", log.LstdFlags)
sarama.PanicHandler = func(err interface{}) {
sarama.Logger.Printf("sarama panic: %v", err)
}
}
}
brokers := []string{broker}
groupID := cfg.ClientID + "-group"
client, err := sarama.NewClient(brokers, cfg)
if err != nil {
log.Panicf("error creating consumer group client: %v", err)
}
consumer := Consumer{
client: client,
ready: make(chan bool),
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group, err := sarama.NewConsumerGroupFromClient(groupID, client)
if err != nil {
log.Panicf("error creating consumer group from client: %v", err)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := group.Consume(ctx, []string{topic}, &consumer); err != nil {
log.Panicf("error from consumer: %v", err)
}
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
log.Printf("Created consumer group %s", groupID)
<-consumer.ready // wait till the consumer has been set up and marked offsets
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Panicf("error closing client: %v", err)
}
}
func (consumer *Consumer) Setup(session sarama.ConsumerGroupSession) error {
for topic, partitions := range session.Claims() {
for _, partition := range partitions {
offset, err := consumer.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
continue
}
log.Printf("Marking offset %d on %s-%d\n", offset, topic, partition)
session.MarkOffset(topic, partition, offset, "")
}
}
// Mark the consumer as ready
close(consumer.ready)
return nil
}
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment