Skip to content

Instantly share code, notes, and snippets.

@kaustavha
Created Feb 20, 2016
Embed
What would you like to do?
Kafka Go benchmarks

You'll need a version of Go >1.3 since Sarama will fail to compile on anything below that. also remember to install sarama using go get github.com/Shopify/sarama and finally build the binary.

Usage:

./warehouser -zookeeper "localhost:2181"

Results:

2016/01/13 22:25:52 Connected to <zk>
2016/01/13 22:25:52 Authenticated: id=311369879628284156, timeout=8000
2016/01/13 22:25:52 Recv loop terminated: err=EOF
2016/01/13 22:25:52 Send loop terminated: err=<nil>
2016/01/13 22:26:12 Consumed: 587793
2016/01/13 22:26:12 Elapsed Time: 20
2016/01/13 22:26:12 TPS: 29389
2016/01/13 22:26:16 Connected to <zk>
2016/01/13 22:26:16 Authenticated: id=311369879628284157, timeout=8000
2016/01/13 22:26:17 Recv loop terminated: err=EOF
2016/01/13 22:26:17 Send loop terminated: err=<nil>
2016/01/13 22:26:37 Consumed: 576012
2016/01/13 22:26:37 Elapsed Time: 20
2016/01/13 22:26:37 TPS: 28800
2016/01/13 22:26:45 Connected to <zk>
2016/01/13 22:26:45 Authenticated: id=311369879628284158, timeout=8000
2016/01/13 22:26:45 Recv loop terminated: err=EOF
2016/01/13 22:26:45 Send loop terminated: err=<nil>
2016/01/13 22:27:05 Consumed: 568915
2016/01/13 22:27:05 Elapsed Time: 20
2016/01/13 22:27:05 TPS: 28445
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"time"
"github.com/Shopify/sarama"
)
var (
brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
topic = flag.String("topic", "", "REQUIRED: the topic to consume")
partitions = flag.String("partitions", "all", "The partitions to consume, can be 'all' or comma-separated numbers")
offset = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`")
verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging")
bufferSize = flag.Int("buffer-size", 256, "The buffer size of the message channel.")
duration = flag.Int("duration", 20, "The duration of the test")
logger = log.New(os.Stderr, "", log.LstdFlags)
)
func main() {
flag.Parse()
if *brokerList == "" {
printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
}
if *topic == "" {
printUsageErrorAndExit("-topic is required")
}
if *verbose {
sarama.Logger = logger
}
totalCountChan := make(chan uint64)
var initialOffset int64
switch *offset {
case "oldest":
initialOffset = sarama.OffsetOldest
case "newest":
initialOffset = sarama.OffsetNewest
default:
printUsageErrorAndExit("-offset should be `oldest` or `newest`")
}
c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil)
if err != nil {
printErrorAndExit(69, "Failed to start consumer: %s", err)
}
partitionList, err := getPartitions(c)
if err != nil {
printErrorAndExit(69, "Failed to get the list of partitions: %s", err)
}
var (
messages = make(chan *sarama.ConsumerMessage, *bufferSize)
closing = make(chan struct{})
wg sync.WaitGroup
)
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Kill, os.Interrupt)
<-signals
logger.Println("Initiating shutdown of consumer...")
close(closing)
}()
start := time.Now()
for _, partition := range partitionList {
pc, err := c.ConsumePartition(*topic, partition, initialOffset)
if err != nil {
printErrorAndExit(69, "Failed to start consumer for partition %d: %s", partition, err)
}
go func(pc sarama.PartitionConsumer) {
<-closing
pc.AsyncClose()
}(pc)
wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer wg.Done()
for message := range pc.Messages() {
messages <- message
}
}(pc)
}
go func(partitionLength int) {
time.Sleep(time.Duration(*duration) * time.Second)
close(closing)
}(len(partitionList))
go func() {
var count uint64 = 0
for range messages {
count++
}
totalCountChan <- count
}()
logger.Println("before wg.Wait")
wg.Wait()
end := time.Now()
logger.Println("after wg.Wait")
close(messages)
var consumed uint64 = 0
consumed += <-totalCountChan
logger.Println("Done consuming topic", *topic)
logger.Printf("Read %d messages\n", consumed)
elapsed := uint64(end.Sub(start).Seconds())
log.Printf("Consumed: %v\n", consumed)
log.Printf("Elapsed Time: %v\n", elapsed)
log.Printf("TPS: %v\n", float64(consumed/elapsed))
if err := c.Close(); err != nil {
logger.Println("Failed to close consumer: ", err)
}
}
func getPartitions(c sarama.Consumer) ([]int32, error) {
if *partitions == "all" {
return c.Partitions(*topic)
}
tmp := strings.Split(*partitions, ",")
var pList []int32
for i := range tmp {
val, err := strconv.ParseInt(tmp[i], 10, 32)
if err != nil {
return nil, err
}
pList = append(pList, int32(val))
}
return pList, nil
}
func printErrorAndExit(code int, format string, values ...interface{}) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
fmt.Fprintln(os.Stderr)
os.Exit(code)
}
func printUsageErrorAndExit(format string, values ...interface{}) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Available command line options:")
flag.PrintDefaults()
os.Exit(64)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment