Skip to content

Instantly share code, notes, and snippets.

@mgnisia
Created March 15, 2022 13:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mgnisia/a7217d5c7c3f7d0b16e7aec6fa595dfb to your computer and use it in GitHub Desktop.
Save mgnisia/a7217d5c7c3f7d0b16e7aec6fa595dfb to your computer and use it in GitHub Desktop.
Reset Kakfa Consumer Group to specific offset
package main
import (
"context"
kafka "github.com/segmentio/kafka-go"
log "github.com/sirupsen/logrus"
"time"
)
type Config struct {
KafkaBroker []string
GroupID string
Topic string
}
func main() {
config := Config{
KafkaBroker: []string{"localhost:9092"},
GroupID: "some-consumer-LoadTest",
Topic: "LoadTest",
}
partitions := 3
ctx := context.Background()
client := kafka.Client{
Addr: kafka.TCP(config.KafkaBroker...),
}
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
for i := 0; i < partitions; i++ {
offsetsLoadTest, err := client.OffsetFetch(ctxTimeout, &kafka.OffsetFetchRequest{
GroupID: config.GroupID,
Topics: map[string][]int{config.Topic: {i}},
})
if err != nil {
log.WithFields(log.Fields{
"err": err,
}).Error("Error during creation of client")
}
log.Println(offsetsLoadTest)
}
group, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{
ID: config.GroupID,
Topics: []string{config.Topic},
Brokers: config.KafkaBroker,
HeartbeatInterval: 2 * time.Second,
RebalanceTimeout: 2 * time.Second,
RetentionTime: time.Hour,
})
defer group.Close()
gen, err := group.Next(ctx)
ocr, err := client.OffsetCommit(ctx, &kafka.OffsetCommitRequest{
GroupID: config.GroupID,
GenerationID: int(gen.ID),
MemberID: gen.MemberID,
Topics: map[string][]kafka.OffsetCommit{
"LoadTest": {
{Partition: 0, Offset: 10},
{Partition: 1, Offset: 10},
{Partition: 2, Offset: 10},
},
},
})
log.Println(err)
log.Println(ocr)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment