Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
package main
import (
var errCommand = errors.New(`command must be "describe" or "consume"`)
type Closer interface {
Close() error
func close(closer Closer) {
if closer == nil {
if err := closer.Close(); err != nil {
log.Printf("error closing: %s\n", err.Error())
func consume(addrs []string, group, topic string, partition int32, stopChan chan bool) error {
client, err := sarama.NewClient(addrs, nil)
if err != nil {
return errors.Trace(err)
defer close(client)
mgr, err := sarama.NewOffsetManagerFromClient(group, client)
if err != nil {
return errors.Trace(err)
defer close(mgr)
partMgr, err := mgr.ManagePartition(topic, partition)
if err != nil {
return errors.Trace(err)
defer close(partMgr)
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
return errors.Trace(err)
defer close(consumer)
// Finally we get to the actual object we are going to use: a partition
// consumer. We fetch the next offset from where we left off from the
// partition offset manager, and start consuming the same partition within
// the topic, starting from that offset.
offset, _ := partMgr.NextOffset()
partConsumer, err := consumer.ConsumePartition(topic, partition, offset)
if err != nil {
return errors.Trace(err)
defer close(partConsumer)
log.Println("Consuming messages from offset", offset, "within", topic, "partition", partition)
for {
select {
// Read messages from the consumer channel, and mark each offset after
// processing it. This persists that value up to Kafka for your consumer
// group, so if the worker is restarted, it can resume where it left off.
// The second argument to the MarkOffset function is `metadata`, which is
// an arbitrary (but relatively short) string that your consumer is
// supposed to be able to use to reconstruct where it left off. Maybe it
// points to a file on disk or something with some persisted state in it.
case msg := <-partConsumer.Messages():
log.Printf("Consumed message: %+v", msg)
partMgr.MarkOffset(msg.Offset, "")
case <-stopChan:
log.Println("Consumer stopping")
return nil
return nil
func main() {
var brokerAddrsArg string
var consumerGroup string
var topic string
var partition int
flag.StringVar(&brokerAddrsArg, "brokers", "", "Comma separated list of Kafka broker addresses to connect to (e.g.,")
flag.StringVar(&consumerGroup, "consumer-group", "test", "Name for the consumer group.")
flag.IntVar(&partition, "partition", 0, "Partition within the topic to consume.")
flag.StringVar(&topic, "topic", "test", "Kafka topic to consume.")
brokerAddrs := strings.Split(brokerAddrsArg, ",")
stopChan := make(chan bool, 1)
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
stopChan <- true
if err := consume(brokerAddrs, consumerGroup, topic, int32(partition), stopChan); err != nil {
log.Fatalf("error: %+v\n", err)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment