Skip to content

Instantly share code, notes, and snippets.

@mihasya
Created March 31, 2015 05:51
Show Gist options
  • Save mihasya/8d5536c881d8292049dc to your computer and use it in GitHub Desktop.
Save mihasya/8d5536c881d8292049dc to your computer and use it in GitHub Desktop.
package warlock
import (
"log"
kafkaClient "github.com/stealthly/go_kafka_client"
)
type TopicConfig struct {
MaxPartitions int
Strategy kafkaClient.WorkerStrategy
FailedAttemptCallback kafkaClient.FailedAttemptCallback
}
type MultiConsumer struct {
Topics map[string]TopicConfig
FailedCallback kafkaClient.FailedCallback
}
func NewMultiConsumer(fc kafkaClient.FailedCallback) *MultiConsumer {
return &MultiConsumer{make(map[string]TopicConfig), fc}
}
func (self *MultiConsumer) RegisterTopic(name string, c TopicConfig) (err error) {
self.Topics[name] = c
return
}
func (self *MultiConsumer) Strategy(w *kafkaClient.Worker, msg *kafkaClient.Message, taskId kafkaClient.TaskId) kafkaClient.WorkerResult {
topicConfig, known := self.Topics[msg.Topic]
if !known {
// TODO return error, we don't know this topic
}
return topicConfig.Strategy(w, msg, taskId)
}
func (self *MultiConsumer) FailedAttemptCallback(t *kafkaClient.Task, r kafkaClient.WorkerResult) kafkaClient.FailedDecision {
topicConfig, known := self.Topics[t.Msg.Topic]
if !known {
// TODO return error, we don't know this topic
}
return topicConfig.FailedAttemptCallback(t, r)
}
func (self *MultiConsumer) Start(kafkaConf *kafkaClient.ConsumerConfig) (err error) {
kafkaConf.Strategy = self.Strategy
kafkaConf.WorkerFailureCallback = self.FailedCallback
kafkaConf.WorkerFailedAttemptCallback = self.FailedAttemptCallback
client := kafkaClient.NewConsumer(kafkaConf)
topicMap := make(map[string]int)
for topic, conf := range self.Topics {
topicMap[topic] = conf.MaxPartitions
}
log.Printf("starting multi consumer with map %v", topicMap)
client.StartStatic(topicMap)
return
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment