Skip to content

Instantly share code, notes, and snippets.

@savaki
Created December 31, 2017 16:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save savaki/f5dadb594d6cf72e1b54265e4e75e9e3 to your computer and use it in GitHub Desktop.
Save savaki/f5dadb594d6cf72e1b54265e4e75e9e3 to your computer and use it in GitHub Desktop.
performance of kafka-go consumer group reader
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
"log"
"strings"
"time"
"github.com/segmentio/kafka-go"
"github.com/segmentio/ksuid"
)
var (
brokers string
topic string
records int
certFile string
caFile string
keyFile string
)
func init() {
flag.StringVar(&brokers, "brokers", "localhost:9092", "broker addresses, comma-separated")
flag.StringVar(&topic, "topic", "topic", "topic to produce to")
flag.IntVar(&records, "records", 250000, "number of records to read from kafka")
flag.StringVar(&certFile, "cert", "_cert.pem", "tls cert")
flag.StringVar(&caFile, "ca", "_ca.pem", "tls ca")
flag.StringVar(&keyFile, "key", "_key.pem", "tls key")
flag.Parse()
}
func check(err error) {
if err != nil {
log.Fatalln(err)
}
}
func tlsConfig() *tls.Config {
certPEM, err := ioutil.ReadFile(certFile)
check(err)
caPEM, err := ioutil.ReadFile(caFile)
check(err)
keyPEM, err := ioutil.ReadFile(keyFile)
check(err)
if certPEM == nil || caPEM == nil || keyPEM == nil {
panic("tls configuration not available")
}
cert, err := tls.X509KeyPair([]byte(certPEM), []byte(keyPEM))
check(err)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(caPEM))
return &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: true,
}
}
func main() {
groupID := ksuid.New().String()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: strings.Split(brokers, ","),
Topic: topic,
GroupID: groupID,
Dialer: &kafka.Dialer{TLS: tlsConfig()},
})
defer r.Close()
ctx := context.Background()
var start time.Time
var count int
for {
_, err := r.ReadMessage(ctx)
check(err)
count++
if count == 1 {
start = time.Now()
}
if count == records {
break
}
}
elapsed := time.Now().Sub(start)
fmt.Printf("kafka-go: %v records, %v\n", records, elapsed)
}
@savaki
Copy link
Author

savaki commented Dec 31, 2017

On my desktop to a remote kafka 0.11.2, I get

kafka-go: 250000 records, 46.731983172s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment