Skip to content

Instantly share code, notes, and snippets.

@abhirockzz
Last active November 30, 2018 11:38
Show Gist options
  • Save abhirockzz/9c9b9e96d0d9658692a25a9a9472a6a2 to your computer and use it in GitHub Desktop.
Save abhirockzz/9c9b9e96d0d9658692a25a9a9472a6a2 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
ctrlCSignal := make(chan os.Signal, 1)
signal.Notify(ctrlCSignal, syscall.SIGTERM)
p, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
go func() {
for kafkaEvt := range p.Events() {
switch kafkaEvt.(type) {
case kafka.Error:
e := kafkaEvt.(kafka.Error)
fmt.Println("error - " + e.String())
case *kafka.Message:
m := kafkaEvt.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
default:
fmt.Println("got event from kafka ", kafkaEvt.String())
}
}
}()
topic := "foo"
partition := kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}
go func() {
for {
msg := &kafka.Message{TopicPartition: partition, Key: []byte("key-" + time.Now().String()), Value: []byte("value-" + time.Now().String())}
p.Produce(msg, nil)
time.Sleep(5 * time.Second) //take it easy
}
}()
<-ctrlCSignal
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment