Skip to content

Instantly share code, notes, and snippets.

@devplayg
Created June 7, 2019 04:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devplayg/3a413989b80ea83db6aa076371357c85 to your computer and use it in GitHub Desktop.
Save devplayg/3a413989b80ea83db6aa076371357c85 to your computer and use it in GitHub Desktop.
sarama/examples/consumergroup/main.go
package main
import (
"context"
"flag"
"log"
"os"
"os/signal"
"strings"
"syscall"
"github.com/Shopify/sarama"
)
// Sarma configuration options
var (
brokers = ""
version = ""
group = ""
topics = ""
oldest = true
verbose = false
)
func init() {
flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
flag.StringVar(&group, "group", "", "Kafka consumer group definition")
flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version")
flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma seperated list")
flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial ofset from oldest")
flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
flag.Parse()
if len(brokers) == 0 {
panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
}
if len(topics) == 0 {
panic("no topics given to be consumed, please set the -topics flag")
}
if len(group) == 0 {
panic("no Kafka consumer group defined, please set the -group flag")
}
}
func main() {
log.Println("Starting a new Sarama consumer")
if verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
version, err := sarama.ParseKafkaVersion(version)
if err != nil {
panic(err)
}
/**
* Construct a new Sarama configuration.
* The Kafka cluster version has to be defined before the consumer/producer is initialized.
*/
config := sarama.NewConfig()
config.Version = version
if oldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
/**
* Setup a new Sarama consumer group
*/
consumer := Consumer{}
ctx := context.Background()
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
panic(err)
}
go func() {
for {
consumer.ready = make(chan bool, 0)
err := client.Consume(ctx, strings.Split(topics, ","), &consumer)
if err != nil {
panic(err)
}
}
}()
<-consumer.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
<-sigterm // Await a sigterm signal before safely closing the consumer
err = client.Close()
if err != nil {
panic(err)
}
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment