Skip to content

Instantly share code, notes, and snippets.

@KJTsanaktsidis
Created April 9, 2020 07:01
Show Gist options
  • Save KJTsanaktsidis/12a33a9e6e864857b91f639947567ac3 to your computer and use it in GitHub Desktop.
Save KJTsanaktsidis/12a33a9e6e864857b91f639947567ac3 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/Shopify/sarama"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
saramaTest()
//confluentTest()
}
func saramaTest() {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_4_0_0
saramaConfig.Net.KeepAlive = 30 * time.Second
saramaConfig.Net.MaxOpenRequests = 1
//saramaConfig.Producer.Partitioner = sarama.NewManualPartitioner
saramaConfig.Producer.Partitioner = sarama.NewRandomPartitioner
saramaConfig.Producer.Compression = sarama.CompressionSnappy
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
saramaConfig.Producer.Idempotent = true
saramaConfig.Producer.Return.Successes = true
saramaConfig.Producer.Return.Errors = true
saramaConfig.Producer.Retry.Max = 50
saramaConfig.Producer.Retry.Backoff = 100 * time.Millisecond
saramaConfig.Producer.Flush.Bytes = 1000000
saramaConfig.Producer.Flush.Frequency = 100 * time.Millisecond
sarama.Logger = log.New(os.Stdout, "[sarama] ", 0)
client, err := sarama.NewClient([]string{"localhost:9092"}, saramaConfig)
if err != nil {
panic(err)
}
broker := client.Brokers()[0]
err = broker.Open(saramaConfig)
if err != nil {
panic(err)
}
_, err = broker.CreateTopics(&sarama.CreateTopicsRequest{
TopicDetails: map[string]*sarama.TopicDetail{
"stuff": {
NumPartitions: 20,
ReplicationFactor: 1,
ConfigEntries: map[string]*string{},
},
},
})
if err != nil {
panic(err)
}
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
panic(err)
}
for i := 0; i < 20; i++ {
grNo := i
go saramaPublishStuff(producer, grNo)
}
time.Sleep(100 * time.Hour)
}
func saramaPublishStuff(producer sarama.SyncProducer, grNo int) {
counter := 0
for {
msg := &sarama.ProducerMessage{
Topic: "stuff",
Partition: int32(grNo),
//Key: sarama.StringEncoder(fmt.Sprintf("sarama_%d_%d", grNo, counter)),
Value: sarama.ByteEncoder([]byte(fmt.Sprintf("sarama_%d_%d", grNo, counter))),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Printf("(publishing %d, %d) ERROR: %+v\n", grNo, counter, err)
} else {
fmt.Printf("(publishing %d, %d) OK (at partition %d, offset %d)\n", grNo, counter, partition, offset)
}
counter++
}
}
func confluentTest() {
kafkaConfig := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"socket.keepalive.enable": true,
"max.in.flight.requests.per.connection": 1,
"compression.codec": "snappy",
"request.required.acks": 1,
"enable.idempotence": true,
"acks": "all",
"retries": 1,
"message.send.max.retries": 1,
"retry.backoff.ms": 100,
"queue.buffering.max.kbytes": 1000000,
"queue.buffering.max.ms": 100,
}
producer, err := kafka.NewProducer(kafkaConfig)
if err != nil {
panic(err)
}
for i := 0; i < 20; i++ {
grNo := i
go confluentPublishStuff(producer, grNo)
}
time.Sleep(100 * time.Hour)
}
func confluentPublishStuff(producer *kafka.Producer, grNo int) {
counter := 0
topic := "stuff"
for {
msg := kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: int32(grNo),
},
Key: []byte(fmt.Sprintf("confluent_%d_%d", grNo, counter)),
Value: []byte(fmt.Sprintf("confluent_%d_%d", grNo, counter)),
}
ev := make(chan kafka.Event)
err := producer.Produce(&msg, ev)
if err != nil {
fmt.Printf("(publishing %d, %d): PRODUCE_ERROR: %+v\n", grNo, counter, err)
} else {
publishEvent := <-ev
if deliveryReport, ok := publishEvent.(*kafka.Message); ok {
fmt.Printf("(publishing %d, %d): OK (at partition %d, offset %d)\n", grNo, counter, deliveryReport.TopicPartition.Partition, deliveryReport.TopicPartition.Offset)
} else {
fmt.Printf("(publishing %d, %d): UNKNOWN_EVENT: %+v\n", grNo, counter, publishEvent)
}
}
close(ev)
counter++
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment