Skip to content

Instantly share code, notes, and snippets.

@mantzas
Created January 29, 2019 20:32
Show Gist options
  • Save mantzas/dbd5a26c91bcadc2f778d8aa05c907f7 to your computer and use it in GitHub Desktop.
Save mantzas/dbd5a26c91bcadc2f778d8aa05c907f7 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"github.com/confluentinc/confluent-kafka-go/kafka"
beatkafka "github.com/taxibeat/go-toolkit/benchmarks/kafka"
)
func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
cnt := beatkafka.NewCounter()
topic := "test_topic"
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"session.timeout.ms": 6000,
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"group.id": "test-id",
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"}})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v\n", c)
err = c.SubscribeTopics([]string{topic}, nil)
run := true
for run == true {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
case ev := <-c.Events():
switch e := ev.(type) {
case kafka.AssignedPartitions:
fmt.Fprintf(os.Stderr, "%% %v\n", e)
c.Assign(e.Partitions)
case kafka.RevokedPartitions:
fmt.Fprintf(os.Stderr, "%% %v\n", e)
c.Unassign()
case *kafka.Message:
cnt.Inc()
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
cnt.PrintStats()
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
run = false
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
cnt.PrintStats()
}
package main
import (
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
beatkafka "github.com/taxibeat/go-toolkit/benchmarks/kafka"
)
func main() {
cnt := beatkafka.NewCounter()
broker := "localhost:9092"
topic := "test_topic"
msg := kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte("test")}
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
if err != nil {
log.Fatalf("failed to create producer: %v", err)
}
terminate := false
wg := sync.WaitGroup{}
go func(t *bool) {
wg.Add(1)
for !*t {
p.ProduceChannel() <- &msg
}
wg.Done()
}(&terminate)
go func(t *bool) {
wg.Add(1)
for !*t {
e := <-p.Events()
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error == nil {
cnt.Inc()
}
}
}
wg.Done()
}(&terminate)
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating", sig)
terminate = true
wg.Wait()
p.Flush(1000)
//p.Close() this one blocks forever
cnt.PrintStats()
os.Exit(0)
case <-time.NewTimer(time.Second).C:
cnt.PrintStats()
}
}
}
package kafka
import (
"log"
"sync"
"sync/atomic"
"time"
)
type Counter struct {
sync.Mutex
start time.Time
cnt int64
}
func NewCounter() *Counter {
return &Counter{cnt: 0, start: time.Now()}
}
func (c *Counter) Inc() {
atomic.AddInt64(&c.cnt, 1)
}
func (c *Counter) PrintStats() {
c.Lock()
count := c.cnt
c.Unlock()
dur := time.Since(c.start)
rate := float64(count) / dur.Seconds()
log.Printf("Processed %d message with a rate of %f msg/sec", c.cnt, rate)
}
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/Shopify/sarama"
beatkafka "github.com/taxibeat/go-toolkit/benchmarks/kafka"
)
type consumerGroupHandler struct {
cnt *beatkafka.Counter
sigChan <-chan os.Signal
}
func (*consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (*consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (cgh *consumerGroupHandler) ConsumeClaim(ses sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case sig := <-cgh.sigChan:
fmt.Printf("Caught signal %v: terminating\n", sig)
return nil
case <-claim.Messages():
cgh.cnt.Inc()
}
}
}
func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
cnt := beatkafka.NewCounter()
cfg := sarama.NewConfig()
cfg.Version = sarama.V1_0_0_0
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
cfg.Consumer.Return.Errors = true
gr, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "sarama-test", cfg)
if err != nil {
log.Fatalf("failed to create consumer group: %v", err)
}
defer func() { _ = gr.Close() }()
topics := []string{"test_topic"}
handler := consumerGroupHandler{cnt: cnt, sigChan: sigchan}
err = gr.Consume(context.Background(), topics, &handler)
if err != nil {
log.Fatalf("failed to consume group: %v", err)
}
cnt.PrintStats()
}
package main
import (
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/Shopify/sarama"
beatkafka "github.com/taxibeat/go-toolkit/benchmarks/kafka"
)
func main() {
cnt := beatkafka.NewCounter()
brokers := []string{"localhost:9092"}
topic := "test_topic"
msg := sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder("test")}
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
config := sarama.NewConfig()
config.Version = sarama.V0_11_0_0
config.Producer.Return.Successes = true
prod, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
log.Fatalf("failed to create producer: %v", err)
}
terminate := false
wg := sync.WaitGroup{}
go func(t *bool) {
wg.Add(1)
for !*t {
prod.Input() <- &msg
}
wg.Done()
}(&terminate)
go func(t *bool) {
wg.Add(1)
for !*t {
<-prod.Successes()
cnt.Inc()
}
wg.Done()
}(&terminate)
for {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating", sig)
terminate = true
wg.Wait()
prod.Close()
cnt.PrintStats()
os.Exit(0)
case <-time.NewTimer(time.Second).C:
cnt.PrintStats()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment