Skip to content

Instantly share code, notes, and snippets.

@mancubus77
Created January 8, 2021 03:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mancubus77/7faa530444abf9c6ba086d8d39185916 to your computer and use it in GitHub Desktop.
Save mancubus77/7faa530444abf9c6ba086d8d39185916 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"time"
"math/rand"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func makeTimestamp() int64 {
return time.Now().UnixNano()
}
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "my-cluster-kafka-brokers.kafka.svc.cluster.local:9092"})
if err != nil {
panic(err)
}
defer p.Close()
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := "topic"
for {
word := fmt.Sprintf("test_master_metric,go_version=1.15.6,version=1.17.0 metric=%vi %v", rand.Intn(10000000), makeTimestamp())
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
time.Sleep(500 * time.Millisecond)
}
p.Flush(15 * 1000)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment