Skip to content

Instantly share code, notes, and snippets.

@nilsmagnus
Created February 15, 2017 08:04
Show Gist options
  • Save nilsmagnus/4b582f9a36279bff5f8f9d453f8fb9c4 to your computer and use it in GitHub Desktop.
Save nilsmagnus/4b582f9a36279bff5f8f9d453f8fb9c4 to your computer and use it in GitHub Desktop.
Example of go consuming from kafka, using the shopify/sarama library
package main
import (
"fmt"
"github.com/Shopify/sarama"
"os"
"os/signal"
"strings"
)
func main() {
config := sarama.NewConfig()
config.ClientID = "go-kafka-consumer"
config.Consumer.Return.Errors = true
brokers := []string{"localhost:9092"}
// Create new consumer
master, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}
defer func() {
if err := master.Close(); err != nil {
panic(err)
}
}()
topics, _ := master.Topics()
consumer, errors := consume(topics, master)
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// Count how many message processed
msgCount := 0
// Get signnal for finish
doneCh := make(chan struct{})
go func() {
for {
select {
case msg := <-consumer:
msgCount++
fmt.Println("Received messages", string(msg.Key), string(msg.Value))
case consumerError := <-errors:
msgCount++
fmt.Println("Received consumerError ", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
doneCh <- struct{}{}
case <-signals:
fmt.Println("Interrupt is detected")
doneCh <- struct{}{}
}
}
}()
<-doneCh
fmt.Println("Processed", msgCount, "messages")
}
func consume(topics []string, master sarama.Consumer) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError) {
consumers := make(chan *sarama.ConsumerMessage)
errors := make(chan *sarama.ConsumerError)
for _, topic := range topics {
if strings.Contains(topic, "__consumer_offsets") {
continue
}
partitions, _ := master.Partitions(topic)
// this only consumes partition no 1, you would probably want to consume all partitions
consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
if nil != err {
fmt.Printf("Topic %v Partitions: %v", topic, partitions)
panic(err)
}
fmt.Println(" Start consuming topic ", topic)
go func(topic string, consumer sarama.PartitionConsumer) {
for {
select {
case consumerError := <-consumer.Errors():
errors <- consumerError
fmt.Println("consumerError: ", consumerError.Err)
case msg := <-consumer.Messages():
consumers <- msg
fmt.Println("Got message on topic ", topic, msg.Value)
}
}
}(topic, consumer)
}
return consumers, errors
}
@cikupin
Copy link

cikupin commented Nov 20, 2020

Guys, still no solution for this commit case?

@d1egoaz
Copy link

d1egoaz commented Feb 11, 2021

You should use a consumer group https://github.com/Shopify/sarama/tree/master/examples/consumergroup if you're interested in continuing consuming from the previous position.

There are some specific cases for the low-level consumer mentioned on this gist, which might or might not work for you, most people should use the sarama consumer group, to let sarama manage the offsets, claims, consumer group rebalances, etc.

The low-level consumer does not create a consumer group, hence the Kafka server is unaware of the client position.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment