Skip to content

Instantly share code, notes, and snippets.

@sajal
Created February 15, 2015 13:32
Show Gist options
  • Save sajal/6df94621335caa77f80f to your computer and use it in GitHub Desktop.
Save sajal/6df94621335caa77f80f to your computer and use it in GitHub Desktop.
example go_kafka_client high cpu
package main
//salvaged from https://github.com/stealthly/go_kafka_client/blob/master/main/main.go
import (
"github.com/stealthly/go_kafka_client"
"log"
"fmt"
// "time"
)
func main(){
topic := "test"
startNewConsumer(topic)
<- make(chan struct{}) //forever
}
func startNewConsumer(topic string) *go_kafka_client.Consumer {
consumer := createConsumer()
topics := map[string]int {topic : 1}
go func() {
consumer.StartStatic(topics)
}()
return consumer
}
func createConsumer() *go_kafka_client.Consumer {
config := go_kafka_client.DefaultConsumerConfig()
coordinatorConfig := go_kafka_client.NewZookeeperConfig()
coordinatorConfig.ZookeeperConnect = []string{"127.0.0.1:2181"}
coordinator := go_kafka_client.NewZookeeperCoordinator(coordinatorConfig)
config.Coordinator = coordinator
// config.AutoOffsetReset = "smallest"
// config.FetchBatchSize = 10
// config.FetchBatchTimeout = 3*time.Second
// config.FetchMessageMaxBytes = 1024 * 1024 * 4
// config.WorkerTaskTimeout = 10*time.Second
// config.NumWorkers = 1
config.Strategy = Strategy
// config.WorkerRetryThreshold = 100
config.WorkerFailureCallback = FailedCallback
config.WorkerFailedAttemptCallback = FailedAttemptCallback
log.Println(config)
consumer := go_kafka_client.NewConsumer(config)
return consumer
}
func Strategy(worker *go_kafka_client.Worker, msg *go_kafka_client.Message, id go_kafka_client.TaskId) go_kafka_client.WorkerResult {
fmt.Println(msg)
return go_kafka_client.NewSuccessfulResult(id)
}
func FailedCallback(wm *go_kafka_client.WorkerManager) go_kafka_client.FailedDecision {
go_kafka_client.Info("main", "Failed callback")
return go_kafka_client.DoNotCommitOffsetAndStop
}
func FailedAttemptCallback(task *go_kafka_client.Task, result go_kafka_client.WorkerResult) go_kafka_client.FailedDecision {
go_kafka_client.Info("main", "Failed attempt")
return go_kafka_client.CommitOffsetAndContinue
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment