-
-
Save sajal/6df94621335caa77f80f to your computer and use it in GitHub Desktop.
example go_kafka_client high cpu
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
//salvaged from https://github.com/stealthly/go_kafka_client/blob/master/main/main.go | |
import ( | |
"github.com/stealthly/go_kafka_client" | |
"log" | |
"fmt" | |
// "time" | |
) | |
func main(){ | |
topic := "test" | |
startNewConsumer(topic) | |
<- make(chan struct{}) //forever | |
} | |
func startNewConsumer(topic string) *go_kafka_client.Consumer { | |
consumer := createConsumer() | |
topics := map[string]int {topic : 1} | |
go func() { | |
consumer.StartStatic(topics) | |
}() | |
return consumer | |
} | |
func createConsumer() *go_kafka_client.Consumer { | |
config := go_kafka_client.DefaultConsumerConfig() | |
coordinatorConfig := go_kafka_client.NewZookeeperConfig() | |
coordinatorConfig.ZookeeperConnect = []string{"127.0.0.1:2181"} | |
coordinator := go_kafka_client.NewZookeeperCoordinator(coordinatorConfig) | |
config.Coordinator = coordinator | |
// config.AutoOffsetReset = "smallest" | |
// config.FetchBatchSize = 10 | |
// config.FetchBatchTimeout = 3*time.Second | |
// config.FetchMessageMaxBytes = 1024 * 1024 * 4 | |
// config.WorkerTaskTimeout = 10*time.Second | |
// config.NumWorkers = 1 | |
config.Strategy = Strategy | |
// config.WorkerRetryThreshold = 100 | |
config.WorkerFailureCallback = FailedCallback | |
config.WorkerFailedAttemptCallback = FailedAttemptCallback | |
log.Println(config) | |
consumer := go_kafka_client.NewConsumer(config) | |
return consumer | |
} | |
func Strategy(worker *go_kafka_client.Worker, msg *go_kafka_client.Message, id go_kafka_client.TaskId) go_kafka_client.WorkerResult { | |
fmt.Println(msg) | |
return go_kafka_client.NewSuccessfulResult(id) | |
} | |
func FailedCallback(wm *go_kafka_client.WorkerManager) go_kafka_client.FailedDecision { | |
go_kafka_client.Info("main", "Failed callback") | |
return go_kafka_client.DoNotCommitOffsetAndStop | |
} | |
func FailedAttemptCallback(task *go_kafka_client.Task, result go_kafka_client.WorkerResult) go_kafka_client.FailedDecision { | |
go_kafka_client.Info("main", "Failed attempt") | |
return go_kafka_client.CommitOffsetAndContinue | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment