Skip to content

Instantly share code, notes, and snippets.

@s4kibs4mi
Last active May 28, 2019 08:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save s4kibs4mi/959d32e24eb6bf2208a1a2532893ae99 to your computer and use it in GitHub Desktop.
Save s4kibs4mi/959d32e24eb6bf2208a1a2532893ae99 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "notification_service",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"users.notification"}, nil)
for {
fmt.Println("Waiting for messages...")
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Println("Topic : ", *msg.TopicPartition.Topic)
fmt.Println("Partition : ", msg.TopicPartition.Partition)
fmt.Println("Offset : ", msg.TopicPartition.Offset)
fmt.Println("Value : ", string(msg.Value))
c.CommitMessage(msg)
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment