Skip to content

Instantly share code, notes, and snippets.

@andela-sjames
Created January 26, 2020 20:37
Show Gist options
  • Save andela-sjames/099bafd7e7c9d01a6915ee05cb87404e to your computer and use it in GitHub Desktop.
Save andela-sjames/099bafd7e7c9d01a6915ee05cb87404e to your computer and use it in GitHub Desktop.
// StartConsumerHandler defined
func StartConsumerHandler(w http.ResponseWriter, r *http.Request) {
// to consume messages
// make a new reader that consumes from sampleOne
fmt.Println("Consumer started")
go func() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"broker:9092", "broker:9093", "broker:9093"},
GroupID: "consumer-Sample-One",
Topic: "sampleOne",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
CommitInterval: time.Second, // flushes commits to Kafka every second
})
for {
m, err := reader.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
reader.Close()
}()
w.Header().Add("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"status": "200",
"message": "Consumer started",
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment