Skip to content

Instantly share code, notes, and snippets.

@mufti1
Created March 5, 2019 10:25
Show Gist options
  • Save mufti1/5019cb348e77c73618fd3cec57fdf0b3 to your computer and use it in GitHub Desktop.
Save mufti1/5019cb348e77c73618fd3cec57fdf0b3 to your computer and use it in GitHub Desktop.
package consumer
import (
"os"
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
)
// KafkaConsumer hold sarama consumer
type KafkaConsumer struct {
Consumer sarama.Consumer
}
// Consume function to consume message from apache kafka
func (c *KafkaConsumer) Consume(topics []string, signals chan os.Signal) {
chanMessage := make(chan *sarama.ConsumerMessage, 256)
for _, topic := range topics {
partitionList, err := c.Consumer.Partitions(topic)
if err != nil {
logrus.Errorf("Unable to get partition got error %v", err)
continue
}
for _, partition := range partitionList {
go consumeMessage(c.Consumer, topic, partition, chanMessage)
}
}
logrus.Infof("Kafka is consuming....")
ConsumerLoop:
for {
select {
case msg := <-chanMessage:
logrus.Infof("New Message from kafka, message: %v", string(msg.Value))
case sig := <-signals:
if sig == os.Interrupt {
break ConsumerLoop
}
}
}
}
func consumeMessage(consumer sarama.Consumer, topic string, partition int32, c chan *sarama.ConsumerMessage) {
msg, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
if err != nil {
logrus.Errorf("Unable to consume partition %v got error %v", partition, err)
return
}
defer func() {
if err := msg.Close(); err != nil {
logrus.Errorf("Unable to close partition %v: %v", partition, err)
}
}()
for {
msg := <-msg.Messages()
c <- msg
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment