Skip to content

Instantly share code, notes, and snippets.

@zgiber
Last active December 5, 2019 18:52
Show Gist options
  • Save zgiber/0c27f6d18f378d2fe27b7a764538d589 to your computer and use it in GitHub Desktop.
Save zgiber/0c27f6d18f378d2fe27b7a764538d589 to your computer and use it in GitHub Desktop.
Confluent Kafka Go package - unexpected behaviour (?)
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
const (
mb = 1024 * 1024
)
var (
// assuming a setup of 3 brokers:
topicPartitions = 5
topicReplicationFactor = 3
topic = "test-topic-1"
broker = "localhost:9092,localhost:9093,localhost:9094"
group = "test-group"
)
func main() {
// publish N messages
messageCount := 30
// clean(ish) start
createTopic()
waitForTopic(topic)
drainTopic(topic)
// produce them
produce(messageCount)
ctx := withSignal(context.Background())
err := consume(ctx)
if err != nil {
log.Println(err)
}
}
type proc struct {
id int
errors int
}
func withSignal(ctx context.Context) context.Context {
ctx, cf := context.WithCancel(ctx)
go func() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
<-sigchan
cf()
}()
return ctx
}
// mostly copied from the confluent example
func consume(ctx context.Context) error {
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": group,
"auto.offset.reset": "earliest",
"enable.auto.commit": true,
"enable.auto.offset.store": false,
"log.connection.close": false,
"max.partition.fetch.bytes": int(mb * 5),
})
if err != nil {
return err
}
seenOffsets := map[int64]int{}
err = consumer.SubscribeTopics([]string{topic}, nil)
if err != nil {
return err
}
fmt.Println("subscribed to", topic)
for i := 0; ; i++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
ev := consumer.Poll(1000)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
tp := e.TopicPartition
fmt.Printf("<- %d\n", tp.Offset)
if count := seenOffsets[int64(tp.Offset)]; count > 0 {
fmt.Printf("duplicated message (%v) at offset %d (x%d)\n", i, tp.Offset, count)
}
seenOffsets[int64(tp.Offset)]++
// after a number of successful messages let's try to trigger the behaviour and see what we get
if i == 15 {
if seekErr := consumer.Seek(tp, 100); seekErr != nil {
return seekErr
}
time.Sleep(1 * time.Second)
tp.Offset++
_, err := consumer.StoreOffsets([]kafka.TopicPartition{tp})
if err != nil {
return err
}
}
// fmt.Printf("%% Message on %s:\n%s\n",
// e.TopicPartition, string(e.Value))
// if e.Headers != nil {
// fmt.Printf("%% Headers: %v\n", e.Headers)
// }
case kafka.Error:
// Errors should generally be considered
// informational, the client will try to
// automatically recover.
// But in this example we choose to terminate
// the application if all brokers are down.
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
return e
}
default:
fmt.Printf("Ignored %v\n", e)
}
}
}
}
func produce(count int) {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
if err != nil {
log.Println(err)
return
}
// flush requires delivery channel to be used otherwise it won't work (weird?)
delivery := make(chan kafka.Event, 1)
go func() {
for i := 0; i < count; i++ {
value := []byte(fmt.Sprint(i))
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: value,
}, delivery)
}
}()
for i := 0; i < count; i++ {
<-delivery
}
waitForMs := 10000
p.Flush(waitForMs) // this does not work without delivery... BUG?
}
func createTopic() {
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker})
if err != nil {
log.Println(err)
return
}
defer a.Close()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result, err := a.CreateTopics(
ctx,
[]kafka.TopicSpecification{{
Topic: topic,
NumPartitions: topicPartitions,
ReplicationFactor: topicReplicationFactor,
}},
kafka.SetAdminOperationTimeout(30*time.Second))
if err != nil {
log.Println(err)
return
}
for _, res := range result {
fmt.Println(res)
}
waitForTopic(topic)
}
func waitForTopic(topic string) {
ac, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker})
if err != nil {
log.Println(err)
return
}
for ok := false; ok == false; ok = topicExists(topic, ac, 1*time.Second) {
}
}
func topicExists(topic string, ac *kafka.AdminClient, timeout time.Duration) bool {
timeoutMs := int(timeout / time.Millisecond)
md, err := ac.GetMetadata(&topic, false, timeoutMs)
if err != nil {
log.Println(err)
return false
}
// this always returns true... have to check later
_, exists := md.Topics[topic]
return exists
}
// consumes all messages for the topic/consumer group
// subsequent consumers with the same group will not
// see these messages
func drainTopic(topic string) {
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker})
if err != nil {
log.Println(err)
return
}
// get metadata by the admin client (gives topics, and partition ID for the topics)
md, err := a.GetMetadata(&topic, false, 10000)
if err != nil {
log.Println(err)
return
}
c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": broker, "group.id": group})
if err != nil {
log.Println(err)
return
}
defer c.Close()
// construct kafka.TopicPartition from the partition metadata
partitions := []kafka.TopicPartition{}
for topic, metadata := range md.Topics {
for _, partition := range metadata.Partitions {
// Note: unfortunately the high watermark is not the offset of the last message produced...
// This makes it difficult to obtain the latest offset in a dynamic environment.
// Instead the consumer can use this as the starting point for draining messages so
// it does not have to discard all messages ever produced.
_, hw, err := c.QueryWatermarkOffsets(topic, partition.ID, 10000)
if err != nil {
log.Println(err)
return
}
// partitions to be assigned to the consumer in the next step
partitions = append(partitions, kafka.TopicPartition{
Topic: &topic,
Partition: partition.ID,
Offset: kafka.Offset(hw),
})
}
}
// try to assign the partitions to this consumer
// (would this steal them from other consumers? Might help SAT tests.)
err = c.Assign(partitions)
if err != nil {
log.Println(err)
return
}
// consume until there is nothing to receive for 200ms
for ; err == nil; _, err = c.ReadMessage(200 * time.Millisecond) {
}
committedPartitions, err := c.CommitOffsets(partitions)
if err != nil {
log.Println(err)
return
}
// leave nicely
err = c.Unassign()
if err != nil {
log.Println(err)
return
}
log.Println("=== Committed partitions:")
var offsetTotal int64
for _, p := range committedPartitions {
log.Println(p.Partition, p.Offset)
offsetTotal += int64(p.Offset)
}
fmt.Println(offsetTotal) // should be visible if it the diff between the prev run does not equal the total emitted messages
}
// don't bother, doesn't work (at least not with the versions we use)...
func deleteTopic() {
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker})
if err != nil {
log.Println(err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result, err := a.DeleteTopics(ctx, []string{topic}, kafka.SetAdminOperationTimeout(30*time.Second))
if err != nil {
log.Println(err)
return
}
for {
var t string = topic
log.Println("get metadata")
metadata, err := a.GetMetadata(&t, false, 10000)
if err != nil {
log.Println(err)
return
}
if len(metadata.Topics) == 0 {
break
}
log.Printf("got metadata for %v partition", len(metadata.Topics))
for topic, md := range metadata.Topics {
log.Println(topic)
for _, p := range md.Partitions {
log.Println(topic, p.ID, p.Error, p.Leader, p.Replicas, p.Isrs)
}
time.Sleep(1 * time.Second)
}
}
for _, res := range result {
fmt.Println(res)
}
}
func publishTestMessages() error {
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment