-
-
Save mihasya/8d5536c881d8292049dc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package warlock | |
import ( | |
"log" | |
kafkaClient "github.com/stealthly/go_kafka_client" | |
) | |
type TopicConfig struct { | |
MaxPartitions int | |
Strategy kafkaClient.WorkerStrategy | |
FailedAttemptCallback kafkaClient.FailedAttemptCallback | |
} | |
type MultiConsumer struct { | |
Topics map[string]TopicConfig | |
FailedCallback kafkaClient.FailedCallback | |
} | |
func NewMultiConsumer(fc kafkaClient.FailedCallback) *MultiConsumer { | |
return &MultiConsumer{make(map[string]TopicConfig), fc} | |
} | |
func (self *MultiConsumer) RegisterTopic(name string, c TopicConfig) (err error) { | |
self.Topics[name] = c | |
return | |
} | |
func (self *MultiConsumer) Strategy(w *kafkaClient.Worker, msg *kafkaClient.Message, taskId kafkaClient.TaskId) kafkaClient.WorkerResult { | |
topicConfig, known := self.Topics[msg.Topic] | |
if !known { | |
// TODO return error, we don't know this topic | |
} | |
return topicConfig.Strategy(w, msg, taskId) | |
} | |
func (self *MultiConsumer) FailedAttemptCallback(t *kafkaClient.Task, r kafkaClient.WorkerResult) kafkaClient.FailedDecision { | |
topicConfig, known := self.Topics[t.Msg.Topic] | |
if !known { | |
// TODO return error, we don't know this topic | |
} | |
return topicConfig.FailedAttemptCallback(t, r) | |
} | |
func (self *MultiConsumer) Start(kafkaConf *kafkaClient.ConsumerConfig) (err error) { | |
kafkaConf.Strategy = self.Strategy | |
kafkaConf.WorkerFailureCallback = self.FailedCallback | |
kafkaConf.WorkerFailedAttemptCallback = self.FailedAttemptCallback | |
client := kafkaClient.NewConsumer(kafkaConf) | |
topicMap := make(map[string]int) | |
for topic, conf := range self.Topics { | |
topicMap[topic] = conf.MaxPartitions | |
} | |
log.Printf("starting multi consumer with map %v", topicMap) | |
client.StartStatic(topicMap) | |
return | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment