Created
October 19, 2019 18:21
-
-
Save chris001177/100f343162910d971c325f245399741b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"flag" | |
"log" | |
"os" | |
"os/signal" | |
"strings" | |
"sync" | |
"syscall" | |
"github.com/Shopify/sarama" | |
) | |
// Sarama configuration options | |
var ( | |
brokers = "" | |
version = "" | |
group = "" | |
topics = "" | |
assignor = "" | |
oldest = true | |
verbose = false | |
) | |
func init() { | |
flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list") | |
flag.StringVar(&group, "group", "", "Kafka consumer group definition") | |
flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version") | |
flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma seperated list") | |
flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)") | |
flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest") | |
flag.BoolVar(&verbose, "verbose", false, "Sarama logging") | |
flag.Parse() | |
if len(brokers) == 0 { | |
panic("no Kafka bootstrap brokers defined, please set the -brokers flag") | |
} | |
if len(topics) == 0 { | |
panic("no topics given to be consumed, please set the -topics flag") | |
} | |
if len(group) == 0 { | |
panic("no Kafka consumer group defined, please set the -group flag") | |
} | |
} | |
func main() { | |
log.Println("Starting a new Sarama consumer") | |
if verbose { | |
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) | |
} | |
version, err := sarama.ParseKafkaVersion(version) | |
if err != nil { | |
log.Panicf("Error parsing Kafka version: %v", err) | |
} | |
/** | |
* Construct a new Sarama configuration. | |
* The Kafka cluster version has to be defined before the consumer/producer is initialized. | |
*/ | |
config := sarama.NewConfig() | |
config.Version = version | |
switch assignor { | |
case "sticky": | |
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky | |
case "roundrobin": | |
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin | |
case "range": | |
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange | |
default: | |
log.Panicf("Unrecognized consumer group partition assignor: %s", assignor) | |
} | |
if oldest { | |
config.Consumer.Offsets.Initial = sarama.OffsetOldest | |
} | |
/** | |
* Setup a new Sarama consumer group | |
*/ | |
consumer := Consumer{ | |
ready: make(chan bool), | |
} | |
ctx, cancel := context.WithCancel(context.Background()) | |
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config) | |
if err != nil { | |
log.Panicf("Error creating consumer group client: %v", err) | |
} | |
wg := &sync.WaitGroup{} | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for { | |
if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil { | |
log.Panicf("Error from consumer: %v", err) | |
} | |
// check if context was cancelled, signaling that the consumer should stop | |
if ctx.Err() != nil { | |
return | |
} | |
consumer.ready = make(chan bool) | |
} | |
}() | |
<-consumer.ready // Await till the consumer has been set up | |
log.Println("Sarama consumer up and running!...") | |
sigterm := make(chan os.Signal, 1) | |
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) | |
select { | |
case <-ctx.Done(): | |
log.Println("terminating: context cancelled") | |
case <-sigterm: | |
log.Println("terminating: via signal") | |
} | |
cancel() | |
wg.Wait() | |
if err = client.Close(); err != nil { | |
log.Panicf("Error closing client: %v", err) | |
} | |
} | |
// Consumer represents a Sarama consumer group consumer | |
type Consumer struct { | |
ready chan bool | |
} | |
// Setup is run at the beginning of a new session, before ConsumeClaim | |
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { | |
// Mark the consumer as ready | |
close(consumer.ready) | |
return nil | |
} | |
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited | |
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { | |
return nil | |
} | |
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). | |
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | |
// NOTE: | |
// Do not move the code below to a goroutine. | |
// The `ConsumeClaim` itself is called within a goroutine, see: | |
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 | |
for message := range claim.Messages() { | |
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic) | |
session.MarkMessage(message, "") | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment