Created
February 19, 2020 16:23
-
-
Save dnwe/b75241f64e9450be269798c38740adf1 to your computer and use it in GitHub Desktop.
Sarama example to subscribe to a topic and immediately commit the current end offset for the assigned partitions.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"os" | |
"sync" | |
"github.com/Shopify/sarama" | |
) | |
type Consumer struct { | |
client sarama.Client | |
ready chan bool | |
} | |
func main() { | |
apikey, ok := os.LookupEnv("APIKEY") | |
if !ok { | |
log.Fatal("APIKEY not set in env") | |
} | |
if len(os.Args) != 3 { | |
fmt.Printf("usage: %s <broker> <topic>\n", os.Args[0]) | |
os.Exit(1) | |
} | |
broker := os.Args[1] | |
topic := os.Args[2] | |
cfg := sarama.NewConfig() | |
{ | |
cfg.Version = sarama.MaxVersion | |
cfg.ClientID, _ = os.Hostname() | |
cfg.Net.SASL.Enable = true | |
cfg.Net.SASL.User = "token" | |
cfg.Net.SASL.Password = apikey | |
cfg.Net.TLS.Enable = true | |
cfg.Producer.Return.Successes = true | |
cfg.Consumer.Offsets.Initial = sarama.OffsetNewest | |
if os.Getenv("DEBUG") != "" { | |
sarama.Logger = log.New(os.Stdout, "[DEBUG] ", log.LstdFlags) | |
sarama.PanicHandler = func(err interface{}) { | |
sarama.Logger.Printf("sarama panic: %v", err) | |
} | |
} | |
} | |
brokers := []string{broker} | |
groupID := cfg.ClientID + "-group" | |
client, err := sarama.NewClient(brokers, cfg) | |
if err != nil { | |
log.Panicf("error creating consumer group client: %v", err) | |
} | |
consumer := Consumer{ | |
client: client, | |
ready: make(chan bool), | |
} | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
group, err := sarama.NewConsumerGroupFromClient(groupID, client) | |
if err != nil { | |
log.Panicf("error creating consumer group from client: %v", err) | |
} | |
wg := &sync.WaitGroup{} | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for { | |
if err := group.Consume(ctx, []string{topic}, &consumer); err != nil { | |
log.Panicf("error from consumer: %v", err) | |
} | |
if ctx.Err() != nil { | |
return | |
} | |
consumer.ready = make(chan bool) | |
} | |
}() | |
log.Printf("Created consumer group %s", groupID) | |
<-consumer.ready // wait till the consumer has been set up and marked offsets | |
cancel() | |
wg.Wait() | |
if err = client.Close(); err != nil { | |
log.Panicf("error closing client: %v", err) | |
} | |
} | |
func (consumer *Consumer) Setup(session sarama.ConsumerGroupSession) error { | |
for topic, partitions := range session.Claims() { | |
for _, partition := range partitions { | |
offset, err := consumer.client.GetOffset(topic, partition, sarama.OffsetNewest) | |
if err != nil { | |
continue | |
} | |
log.Printf("Marking offset %d on %s-%d\n", offset, topic, partition) | |
session.MarkOffset(topic, partition, offset, "") | |
} | |
} | |
// Mark the consumer as ready | |
close(consumer.ready) | |
return nil | |
} | |
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { | |
return nil | |
} | |
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment