Skip to content

Instantly share code, notes, and snippets.

Last active August 22, 2022 09:27
Show Gist options
  • Save savaki/a19dcc1e72cb5d621118fbee1db4e61f to your computer and use it in GitHub Desktop.
Save savaki/a19dcc1e72cb5d621118fbee1db4e61f to your computer and use it in GitHub Desktop.
comparison of confluent-kafka-go vs sarama-cluster consumer performance
package main
import (
var (
brokers string
topic string
records int
certFile string
caFile string
keyFile string
func init() {
flag.StringVar(&brokers, "brokers", "localhost:9092", "broker addresses, comma-separated")
flag.StringVar(&topic, "topic", "topic", "topic to produce to")
flag.IntVar(&records, "records", 250000, "number of records to read from kafka")
flag.StringVar(&certFile, "cert", "_cert.pem", "tls cert")
flag.StringVar(&caFile, "ca", "_ca.pem", "tls ca")
flag.StringVar(&keyFile, "key", "_key.pem", "tls key")
func check(err error) {
if err != nil {
func main() {
func confluent() {
groupID := ksuid.New().String()
cm := &kafka.ConfigMap{
"": 6000,
"": brokers,
"": false,
"": true,
"go.application.rebalance.enable": true,
"": groupID,
"default.topic.config": kafka.ConfigMap{
"auto.offset.reset": "earliest",
"security.protocol": "ssl",
"": caFile,
"ssl.certificate.location": certFile,
"ssl.key.location": keyFile,
consumer, err := kafka.NewConsumer(cm)
defer consumer.Close()
check(consumer.Subscribe(topic, nil))
var start time.Time
count := 0
for {
select {
case m, ok := <-consumer.Events():
if !ok {
panic("unexpected eof")
switch event := m.(type) {
case kafka.AssignedPartitions:
case kafka.PartitionEOF:
// nop
case kafka.RevokedPartitions:
case *kafka.Message:
if count == 1 {
start = time.Now()
if count == records {
break loop
elapsed := time.Now().Sub(start)
fmt.Printf("confluent: %v records, %v\n", count, elapsed)
func tlsConfig() *tls.Config {
certPEM, err := ioutil.ReadFile(certFile)
caPEM, err := ioutil.ReadFile(caFile)
keyPEM, err := ioutil.ReadFile(keyFile)
if certPEM == nil || caPEM == nil || keyPEM == nil {
panic("tls configuration not available")
cert, err := tls.X509KeyPair([]byte(certPEM), []byte(keyPEM))
caCertPool := x509.NewCertPool()
return &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: true,
func saramago() {
// Code from the sarama-cluster same page with minimal changes for TLS and broker/topic
// init (custom) config, enable errors and notifications
config := cluster.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig()
// init consumer
groupID := ksuid.New().String()
consumer, err := cluster.NewConsumer(strings.Split(brokers, ","), groupID, []string{topic}, config)
if err != nil {
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume errors
go func() {
for err := range consumer.Errors() {
log.Printf("Error: %s\n", err.Error())
// consume notifications
go func() {
for n := range consumer.Notifications() {
log.Printf("Notification: %v\n", n)
var start time.Time
var count int
// consume messages, watch signals
for {
select {
case _, ok := <-consumer.Messages():
if !ok {
panic("messages channel unexpectedly closed")
if count == 1 {
start = time.Now()
if count == records {
break loop
case <-signals:
elapsed := time.Now().Sub(start)
fmt.Printf("sarama-cluster: %v records, %v\n", count, elapsed)
Copy link

savaki commented Dec 31, 2017

From my desktop with TLS:

confluent: 250000 records, 34.84177593s
sarama-cluster: 250000 records, 2m20.614963038s

Net net, confluent was almost 5x faster on the read.

Copy link

polyma commented Mar 31, 2018

Useful. Thanks!

Copy link

Really useful, thanks!

Copy link

I removed the SSL certs and ran the same benchmark, locally with a single kafka instance:

confluent: 250000 records, 1.185442044s
sarama-cluster: 250000 records, 691.240535ms

Copy link

cztchoice commented Jun 15, 2019

removed the SSL certs and ran the same benchmark, consume production kafka cluster, version: 0.10 or 1.1.0

confluent: 250000 records, 2.45766098s
sarama-cluster: 250000 records, 745.87133ms

when the size of message is larger:

confluent: 250000 records, 4.812173908s
sarama-cluster: 250000 records, 2.349170585s

but, when I change consumer consume from Lastest offset, result is:

confluent: 250000 records, 3.445361514s
sarama-cluster: 250000 records, 4.127447438s

large message size topic:

confluent: 250000 records, 7.563664321s
sarama-cluster: 250000 records, 8.164795679s

in this case, most time confluent-kafka-go outperform sarama ( 9/10)

exception topic's average message size is 200 bytes, much less than other topic

confluent: 250000 records, 1.867976538s
sarama-cluster: 250000 records, 1.230283069s

Copy link

wrfly commented Jan 14, 2021

single local Kafka (latest confluent version), consumer without SSL

module x

go 1.15

require ( v1.27.2 v2.1.15+incompatible v1.5.2 v1.0.3

single message size: 7000+ chars

./x -records 10000
confluent: 10000 records, 115.327572ms
sarama-cluster: 10000 records, 387.687596ms

./x -records 10000
confluent: 10000 records, 122.492281ms
sarama-cluster: 10000 records, 412.542721ms

./x -records 10000
confluent: 10000 records, 167.982983ms
sarama-cluster: 10000 records, 381.887461ms

./x -records 25000
confluent: 25000 records, 359.892406ms
sarama-cluster: 25000 records, 1.017735782s

./x -records 25000
confluent: 25000 records, 288.432792ms
sarama-cluster: 25000 records, 1.045038765s

./x -records 25000
confluent: 25000 records, 393.244614ms
sarama-cluster: 25000 records, 1.004096071s

with Kafka 1.0.0 ( 7 brokers)

./x -brokers -records 50000
confluent: 50000 records, 203.889724ms
2021/01/14 17:39:23 Notification: &{rebalance start map[] map[] map[]}
2021/01/14 17:39:26 Notification: &{rebalance OK ...
sarama-cluster: 50000 records, 287.838027ms

./x -brokers -records 100000
confluent: 100000 records, 325.483269ms
2021/01/14 17:39:41 Notification: &{rebalance start map[] map[] map[]}
2021/01/14 17:39:44 Notification: &{rebalance OK ...
sarama-cluster: 100000 records, 720.05731ms

./x -brokers -records 100000
confluent: 100000 records, 403.692593ms
2021/01/14 17:39:50 Notification: &{rebalance start map[] map[] map[]}
2021/01/14 17:39:53 Notification: &{rebalance OK ...
sarama-cluster: 100000 records, 718.205965ms

Copy link

wrfly commented Jan 15, 2021

kafka cluster 2.5.0

confluent: 250000 records, 3.239198631s
2021/01/15 14:24:21 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:24:21 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 2.17630413s

confluent: 250000 records, 2.619734736s
2021/01/15 14:24:30 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:24:30 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 2.077333905s

confluent: 250000 records, 3.021209597s
2021/01/15 14:24:38 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:24:38 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 1.941509797s

confluent: 250000 records, 2.777872268s
2021/01/15 14:24:50 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:24:50 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 2.027456435s

confluent: 250000 records, 2.324268327s
2021/01/15 14:24:56 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:24:56 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 2.140472828s

confluent: 250000 records, 2.448384773s
2021/01/15 14:25:01 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:25:01 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 2.074605723s

after enable auto commit:

./compare -records 250000
confluent: 250000 records, 8.818076893s
2021/01/15 14:37:21 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:37:21 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 9.940272815s

./compare -records 500000
confluent: 500000 records, 25.078906199s
2021/01/15 14:38:08 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:38:08 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 500000 records, 29.160564356s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment