Skip to content

Instantly share code, notes, and snippets.

@elakito
Created December 13, 2016 16:32
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elakito/8c33dd318def85f6544a3ba799e34c36 to your computer and use it in GitHub Desktop.
Save elakito/8c33dd318def85f6544a3ba799e34c36 to your computer and use it in GitHub Desktop.
sarama_cluster's publisher subscriber samples to test partition assignment
package main
import (
"flag"
"fmt"
"log"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
)
// A sample program to show the partiton claiming behavior against kafka 0.9 or 0.10
// This sample program assumes a kafka instance is available at localhost:9092.
// This program either publish or subscribe or do both to topic "testTopic_n, where n
// is the specified random identifier suffix. The consumer part uses group Id "testGroup_n".
//
// The consumer is supposed to claim the partition when messages are published to the above topic,
// no matter how the order of subscription and publishing occurs. However, it seems the consumer
// fails to claim the partition when it subscribes to the topic first and then some messages
// are published. In other words, when the consumer subscribes to a previously non existent topic
// and the publisher publishes messages later, the consumer fails to claim the partition. When
// the consumer is restarted, it succeeds to claim the partition. The following steps will show
// this behavior
// 1. working case where the consumer suceeds to claim the pertition
// (assuming where 145 is a previously unused suffix)
// 1.1 publish some messages to test topic 145
// [Console 1]
// $ go run sc_subtest.go -rnd 145 -usepub
// 2016/12/13 17:16:41 Creating a producer for topic testTopic_145 ...
// 2016/12/13 17:16:42 Message Hello 0 published to partition 0 at offset 0
// 2016/12/13 17:16:42 Message Hello 1 published to partition 0 at offset 1
// 2016/12/13 17:16:42 Message Hello 2 published to partition 0 at offset 2
// 2016/12/13 17:16:42 Message Hello 3 published to partition 0 at offset 3
// 2016/12/13 17:16:42 Message Hello 4 published to partition 0 at offset 4
// 2016/12/13 17:16:42 Published: 5
// $
// 1.2 subscribe to test topic 145
// [Console 2]
// $ go run sc_subtest.go -rnd 145 -usesub
// 2016/12/13 17:18:00 Creating a consumer for topic testTopic_145 with groupdId testGroup_145 ...
// 2016/12/13 17:18:00 Wait for 300 seconds before shutting down ...
// Starting to poll ...
// 2016/12/13 17:18:00 Rebalanced
// 2016/12/13 17:18:00 Claimed partition [testTopic_145/0]
//
// 1.3 publish more messages to test topic 145
// [Console 1]
// $ go run sc_subtest.go -rnd 145 -usepub
// 2016/12/13 17:19:16 Creating a producer for topic testTopic_145 ...
// 2016/12/13 17:19:16 Message Hello 0 published to partition 0 at offset 5
// 2016/12/13 17:19:16 Message Hello 1 published to partition 0 at offset 6
// ...
// [Console 2]
// 2016/12/13 17:19:16 Consumed testTopic_145/0/5 value 'Hello 0'
// 2016/12/13 17:19:16 Consumed testTopic_145/0/6 value 'Hello 1'
// 2016/12/13 17:19:16 Consumed testTopic_145/0/7 value 'Hello 2'
// 2016/12/13 17:19:16 Consumed testTopic_145/0/8 value 'Hello 3'
// 2016/12/13 17:19:16 Consumed testTopic_145/0/9 value 'Hello 4'
//
// 2. not working case where the consumer fails to claim the pertition
// (assuming 146 is a previous unused suffix)
// 2.1 subscribe to test topic 146
// $ go run sc_subtest.go -rnd 146 -usesub
// [Console 2]
// 2016/12/13 17:23:23 Creating a consumer for topic testTopic_146 with groupdId testGroup_146 ...
// 2016/12/13 17:23:23 Wait for 300 seconds before shutting down ...
// Starting to poll ...
// 2016/12/13 17:23:24 Rebalanced
//
// 2.2 publish some messages to test topic 146
// [Console 1]
// $ go run sc_subtest.go -rnd 146 -usepub
// 2016/12/13 17:24:29 Creating a producer for topic testTopic_146 ...
// 2016/12/13 17:24:29 Message Hello 0 published to partition 0 at offset 0
// 2016/12/13 17:24:29 Message Hello 1 published to partition 0 at offset 1
// 2016/12/13 17:24:29 Message Hello 2 published to partition 0 at offset 2
// 2016/12/13 17:24:29 Message Hello 3 published to partition 0 at offset 3
// 2016/12/13 17:24:29 Message Hello 4 published to partition 0 at offset 4
// 2016/12/13 17:24:29 Published: 5
//[Console 2]
//
// 3. not working case (same as case 2) but using a single program for conveniece
// [Console 1]
// $ go run sc_subtest.go -rnd 147 -usepub -usesub
// 2016/12/13 17:29:34 Creating a consumer for topic testTopic_147 with groupdId testGroup_147 ...
// 2016/12/13 17:29:34 Creating a producer for topic testTopic_147 ...
// Starting to poll ...
// 2016/12/13 17:29:35 Rebalanced
// 2016/12/13 17:29:35 Message Hello 0 published to partition 0 at offset 0
// 2016/12/13 17:29:35 Message Hello 1 published to partition 0 at offset 1
// 2016/12/13 17:29:35 Message Hello 2 published to partition 0 at offset 2
// 2016/12/13 17:29:35 Message Hello 3 published to partition 0 at offset 3
// 2016/12/13 17:29:35 Message Hello 4 published to partition 0 at offset 4
// 2016/12/13 17:29:35 Published: 5
// 2016/12/13 17:29:35 Wait for 300 seconds before shutting down ...
//
func main() {
brokerStr := flag.String("brokers", "localhost:9092", "bootstrap broker addresses")
rnd := flag.String("rnd", "0", "random identifier to be used as suffix")
maxwaittime := flag.Int("max_wait_time", 1000, "max wait time")
usepub := flag.Bool("usepub", false, "activate the pub part")
usesub := flag.Bool("usesub", false, "activate the sub part")
flag.Parse()
topic := fmt.Sprintf("testTopic_%s", *rnd)
groupid := fmt.Sprintf("testGroup_%s", *rnd)
brokers := strings.Split(*brokerStr, ",")
config := cluster.NewConfig()
config.Consumer.MaxWaitTime = time.Duration(*maxwaittime) * time.Millisecond
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
var consumer *cluster.Consumer
var producer sarama.SyncProducer
var err error
if *usesub {
log.Printf("Creating a consumer for topic %s with groupdId %s ...", topic, groupid)
consumer, err = cluster.NewConsumer(brokers, groupid, []string{topic}, config)
if err != nil {
panic(err)
}
defer func() {
if err := consumer.Close(); err != nil {
log.Printf("[ERROR] %v", err)
}
}()
go func() {
for err := range consumer.Errors() {
log.Printf("Error: %s", err.Error())
}
}()
go func() {
for note := range consumer.Notifications() {
log.Println("Rebalanced")
for topic, partitions := range note.Claimed {
for partition := range partitions {
log.Printf("Claimed partition [%s/%d]", topic, partition)
}
}
for topic, partitions := range note.Released {
for partition := range partitions {
log.Printf("Released partition [%s/%d]", topic, partition)
}
}
}
}()
}
if *usepub {
log.Printf("Creating a producer for topic %s ...", topic)
producer, err = sarama.NewSyncProducer(brokers, nil)
if err != nil {
panic(err)
}
}
consumed := 0
published := 0
terminate := make(chan bool, 1)
if *usesub {
// start consuming messages
go func() {
fmt.Println("Starting to poll ...")
ConsumerLoop:
for {
select {
case msg := <-consumer.Messages():
log.Printf("Consumed %s/%d/%d \tvalue\t '%s'\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
consumed++
consumer.MarkOffset(msg, "")
case <-terminate:
break ConsumerLoop
}
}
}()
}
if *usepub {
// publish some messages ....
for i := 0; i < 5; i++ {
msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(fmt.Sprintf("Hello %d", i))}
p, o, err := producer.SendMessage(msg)
if err != nil {
log.Panicf("Ping message could not be published: %v", err)
}
published++
log.Printf("Message %s published to partition %d at offset %d", msg.Value, p, o)
}
log.Printf("Published: %d\n", published)
}
if *usesub {
log.Println("Wait for 300 seconds before shutting down ...")
time.Sleep(time.Second * 300)
terminate <- true
log.Printf("Consumed: %d\n", consumed)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment