Skip to content

Instantly share code, notes, and snippets.

@savaki
Last active August 22, 2022 09:27
Show Gist options
  • Star 22 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save savaki/a19dcc1e72cb5d621118fbee1db4e61f to your computer and use it in GitHub Desktop.
Save savaki/a19dcc1e72cb5d621118fbee1db4e61f to your computer and use it in GitHub Desktop.
comparison of confluent-kafka-go vs sarama-cluster consumer performance
package main
import (
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"os/signal"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/segmentio/ksuid"
)
var (
brokers string
topic string
records int
certFile string
caFile string
keyFile string
)
func init() {
flag.StringVar(&brokers, "brokers", "localhost:9092", "broker addresses, comma-separated")
flag.StringVar(&topic, "topic", "topic", "topic to produce to")
flag.IntVar(&records, "records", 250000, "number of records to read from kafka")
flag.StringVar(&certFile, "cert", "_cert.pem", "tls cert")
flag.StringVar(&caFile, "ca", "_ca.pem", "tls ca")
flag.StringVar(&keyFile, "key", "_key.pem", "tls key")
flag.Parse()
}
func check(err error) {
if err != nil {
log.Fatalln(err)
}
}
func main() {
confluent()
saramago()
}
func confluent() {
groupID := ksuid.New().String()
cm := &kafka.ConfigMap{
"session.timeout.ms": 6000,
"metadata.broker.list": brokers,
"enable.auto.commit": false,
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"group.id": groupID,
"default.topic.config": kafka.ConfigMap{
"auto.offset.reset": "earliest",
},
"security.protocol": "ssl",
"ssl.ca.location": caFile,
"ssl.certificate.location": certFile,
"ssl.key.location": keyFile,
}
consumer, err := kafka.NewConsumer(cm)
check(err)
defer consumer.Close()
check(consumer.Subscribe(topic, nil))
var start time.Time
count := 0
loop:
for {
select {
case m, ok := <-consumer.Events():
if !ok {
panic("unexpected eof")
}
switch event := m.(type) {
case kafka.AssignedPartitions:
consumer.Assign(event.Partitions)
case kafka.PartitionEOF:
// nop
case kafka.RevokedPartitions:
consumer.Unassign()
case *kafka.Message:
count++
if count == 1 {
start = time.Now()
}
if count == records {
break loop
}
default:
panic(m)
}
}
}
elapsed := time.Now().Sub(start)
fmt.Printf("confluent: %v records, %v\n", count, elapsed)
}
func tlsConfig() *tls.Config {
certPEM, err := ioutil.ReadFile(certFile)
check(err)
caPEM, err := ioutil.ReadFile(caFile)
check(err)
keyPEM, err := ioutil.ReadFile(keyFile)
check(err)
if certPEM == nil || caPEM == nil || keyPEM == nil {
panic("tls configuration not available")
}
cert, err := tls.X509KeyPair([]byte(certPEM), []byte(keyPEM))
check(err)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(caPEM))
return &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: true,
}
}
func saramago() {
// Code from the sarama-cluster same page with minimal changes for TLS and broker/topic
//
// init (custom) config, enable errors and notifications
config := cluster.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig()
// init consumer
groupID := ksuid.New().String()
consumer, err := cluster.NewConsumer(strings.Split(brokers, ","), groupID, []string{topic}, config)
if err != nil {
panic(err)
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume errors
go func() {
for err := range consumer.Errors() {
log.Printf("Error: %s\n", err.Error())
}
}()
// consume notifications
go func() {
for n := range consumer.Notifications() {
log.Printf("Notification: %v\n", n)
}
}()
var start time.Time
var count int
// consume messages, watch signals
loop:
for {
select {
case _, ok := <-consumer.Messages():
if !ok {
panic("messages channel unexpectedly closed")
}
count++
if count == 1 {
start = time.Now()
}
if count == records {
break loop
}
case <-signals:
return
}
}
elapsed := time.Now().Sub(start)
fmt.Printf("sarama-cluster: %v records, %v\n", count, elapsed)
}
@savaki
Copy link
Author

savaki commented Dec 31, 2017

From my desktop with TLS:

confluent: 250000 records, 34.84177593s
sarama-cluster: 250000 records, 2m20.614963038s

Net net, confluent was almost 5x faster on the read.

@polyma
Copy link

polyma commented Mar 31, 2018

Useful. Thanks!

@otherpirate
Copy link

Really useful, thanks!

@emcfarlane
Copy link

I removed the SSL certs and ran the same benchmark, locally with a single kafka instance:

confluent: 250000 records, 1.185442044s
sarama-cluster: 250000 records, 691.240535ms

@cztchoice
Copy link

cztchoice commented Jun 15, 2019

removed the SSL certs and ran the same benchmark, consume production kafka cluster, version: 0.10 or 1.1.0

confluent: 250000 records, 2.45766098s
sarama-cluster: 250000 records, 745.87133ms

when the size of message is larger:

confluent: 250000 records, 4.812173908s
sarama-cluster: 250000 records, 2.349170585s

but, when I change consumer consume from Lastest offset, result is:

confluent: 250000 records, 3.445361514s
sarama-cluster: 250000 records, 4.127447438s

large message size topic:

confluent: 250000 records, 7.563664321s
sarama-cluster: 250000 records, 8.164795679s

in this case, most time confluent-kafka-go outperform sarama ( 9/10)

exception topic's average message size is 200 bytes, much less than other topic

confluent: 250000 records, 1.867976538s
sarama-cluster: 250000 records, 1.230283069s

@wrfly
Copy link

wrfly commented Jan 14, 2021

single local Kafka (latest confluent version), consumer without SSL

module x

go 1.15

require (
        github.com/Shopify/sarama v1.27.2
        github.com/bsm/sarama-cluster v2.1.15+incompatible
        github.com/confluentinc/confluent-kafka-go v1.5.2
        github.com/segmentio/ksuid v1.0.3
)

single message size: 7000+ chars

./x -records 10000
confluent: 10000 records, 115.327572ms
sarama-cluster: 10000 records, 387.687596ms

./x -records 10000
confluent: 10000 records, 122.492281ms
sarama-cluster: 10000 records, 412.542721ms

./x -records 10000
confluent: 10000 records, 167.982983ms
sarama-cluster: 10000 records, 381.887461ms

./x -records 25000
confluent: 25000 records, 359.892406ms
sarama-cluster: 25000 records, 1.017735782s

./x -records 25000
confluent: 25000 records, 288.432792ms
sarama-cluster: 25000 records, 1.045038765s

./x -records 25000
confluent: 25000 records, 393.244614ms
sarama-cluster: 25000 records, 1.004096071s

with Kafka 1.0.0 ( 7 brokers)

./x -brokers -records 50000
confluent: 50000 records, 203.889724ms
2021/01/14 17:39:23 Notification: &{rebalance start map[] map[] map[]}
2021/01/14 17:39:26 Notification: &{rebalance OK ...
sarama-cluster: 50000 records, 287.838027ms

./x -brokers -records 100000
confluent: 100000 records, 325.483269ms
2021/01/14 17:39:41 Notification: &{rebalance start map[] map[] map[]}
2021/01/14 17:39:44 Notification: &{rebalance OK ...
sarama-cluster: 100000 records, 720.05731ms

./x -brokers -records 100000
confluent: 100000 records, 403.692593ms
2021/01/14 17:39:50 Notification: &{rebalance start map[] map[] map[]}
2021/01/14 17:39:53 Notification: &{rebalance OK ...
sarama-cluster: 100000 records, 718.205965ms

@wrfly
Copy link

wrfly commented Jan 15, 2021

kafka cluster 2.5.0


./compare/main
confluent: 250000 records, 3.239198631s
2021/01/15 14:24:21 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:24:21 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 2.17630413s

./compare/main
confluent: 250000 records, 2.619734736s
2021/01/15 14:24:30 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:24:30 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 2.077333905s

./compare/main
confluent: 250000 records, 3.021209597s
2021/01/15 14:24:38 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:24:38 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 1.941509797s

./compare/main
confluent: 250000 records, 2.777872268s
2021/01/15 14:24:50 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:24:50 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 2.027456435s

./compare/main
confluent: 250000 records, 2.324268327s
2021/01/15 14:24:56 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:24:56 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 2.140472828s

./compare/main
confluent: 250000 records, 2.448384773s
2021/01/15 14:25:01 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:25:01 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 2.074605723s

after enable auto commit:

./compare -records 250000
confluent: 250000 records, 8.818076893s
2021/01/15 14:37:21 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:37:21 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 250000 records, 9.940272815s

./compare -records 500000
confluent: 500000 records, 25.078906199s
2021/01/15 14:38:08 Notification: &{rebalance start map[] map[] map[]}
2021/01/15 14:38:08 Notification: &{rebalance OK map[test:[0 1 2 3 4 5]] map[] map[test:[0 1 2 3 4 5]]}
sarama-cluster: 500000 records, 29.160564356s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment