package main | |
import ( | |
"flag" | |
"log" | |
"os" | |
"os/signal" | |
"strings" | |
"github.com/Shopify/sarama" | |
"github.com/davecheney/errors" | |
) | |
var errCommand = errors.New(`command must be "describe" or "consume"`) | |
type Closer interface { | |
Close() error | |
} | |
func close(closer Closer) { | |
if closer == nil { | |
return | |
} | |
if err := closer.Close(); err != nil { | |
log.Printf("error closing: %s\n", err.Error()) | |
} | |
} | |
func consume(addrs []string, group, topic string, partition int32, stopChan chan bool) error { | |
client, err := sarama.NewClient(addrs, nil) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
defer close(client) | |
mgr, err := sarama.NewOffsetManagerFromClient(group, client) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
defer close(mgr) | |
partMgr, err := mgr.ManagePartition(topic, partition) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
defer close(partMgr) | |
consumer, err := sarama.NewConsumerFromClient(client) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
defer close(consumer) | |
// Finally we get to the actual object we are going to use: a partition | |
// consumer. We fetch the next offset from where we left off from the | |
// partition offset manager, and start consuming the same partition within | |
// the topic, starting from that offset. | |
offset, _ := partMgr.NextOffset() | |
partConsumer, err := consumer.ConsumePartition(topic, partition, offset) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
defer close(partConsumer) | |
log.Println("Consuming messages from offset", offset, "within", topic, "partition", partition) | |
for { | |
select { | |
// Read messages from the consumer channel, and mark each offset after | |
// processing it. This persists that value up to Kafka for your consumer | |
// group, so if the worker is restarted, it can resume where it left off. | |
// | |
// The second argument to the MarkOffset function is `metadata`, which is | |
// an arbitrary (but relatively short) string that your consumer is | |
// supposed to be able to use to reconstruct where it left off. Maybe it | |
// points to a file on disk or something with some persisted state in it. | |
case msg := <-partConsumer.Messages(): | |
log.Printf("Consumed message: %+v", msg) | |
partMgr.MarkOffset(msg.Offset, "") | |
case <-stopChan: | |
log.Println("Consumer stopping") | |
return nil | |
} | |
} | |
return nil | |
} | |
func main() { | |
var brokerAddrsArg string | |
var consumerGroup string | |
var topic string | |
var partition int | |
flag.StringVar(&brokerAddrsArg, "brokers", "127.0.0.1:9092", "Comma separated list of Kafka broker addresses to connect to (e.g. 127.0.0.1:9092,127.0.0.1:9093).") | |
flag.StringVar(&consumerGroup, "consumer-group", "test", "Name for the consumer group.") | |
flag.IntVar(&partition, "partition", 0, "Partition within the topic to consume.") | |
flag.StringVar(&topic, "topic", "test", "Kafka topic to consume.") | |
flag.Parse() | |
brokerAddrs := strings.Split(brokerAddrsArg, ",") | |
stopChan := make(chan bool, 1) | |
go func() { | |
c := make(chan os.Signal, 1) | |
signal.Notify(c, os.Interrupt) | |
<-c | |
stopChan <- true | |
}() | |
if err := consume(brokerAddrs, consumerGroup, topic, int32(partition), stopChan); err != nil { | |
log.Fatalf("error: %+v\n", err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment