Skip to content

Instantly share code, notes, and snippets.

@kai5263499
Last active September 16, 2021 14:47
Show Gist options
  • Save kai5263499/a2d1b5ef856a0dce0084949fa20a162c to your computer and use it in GitHub Desktop.
Save kai5263499/a2d1b5ef856a0dce0084949fa20a162c to your computer and use it in GitHub Desktop.
Create log compacted topic in golang
package main
import (
"context"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kafka"
"github.com/caarlos0/env"
ckafka "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/davecgh/go-spew/spew"
log "github.com/sirupsen/logrus"
)
type config struct {
LogLevel string `env:"LOG_LEVEL" envDefault:"debug"`
AWSRegion string `env:"AWS_REGION" envDefault:"us-west-2"`
MSKARN string `env:"MSK_ARN"`
RouteTopic string `env:"ROUTE_TOPIC"`
KafkaGroupId string `env:"KAFKA_GROUP_ID" envDefault:"tmp-worker-1"`
MaxDelay string `env:"MAX_DELAY" envDefault:"10s"`
}
var (
cfg config
)
func main() {
var err error
cfg = config{}
if err = env.Parse(&cfg); err != nil {
log.WithError(err).Fatal("error parsing config")
}
if level, err := log.ParseLevel(cfg.LogLevel); err != nil {
log.WithError(err).Fatal("error parsing log level")
} else {
log.SetLevel(level)
}
mySession := session.Must(session.NewSession())
// Create a Kafka client with additional configuration
svc := kafka.New(mySession, aws.NewConfig().WithRegion(cfg.AWSRegion))
fields := log.Fields{
"arn": cfg.MSKARN,
"topic": cfg.RouteTopic,
}
log.WithFields(fields).Info("created new session")
params := &kafka.GetBootstrapBrokersInput{
ClusterArn: aws.String(cfg.MSKARN),
}
req, resp := svc.GetBootstrapBrokersRequest(params)
err = req.Send()
if err != nil {
log.WithFields(fields).WithError(err).Fatal("error getting bootstrap brokers")
}
fields["brokers"] = *resp.BootstrapBrokerStringTls
log.WithFields(fields).Info("obtained kafka brokers")
kafkaConfigMap := &ckafka.ConfigMap{
"bootstrap.servers": *resp.BootstrapBrokerStringTls,
"group.id": cfg.KafkaGroupId,
"security.protocol": "ssl",
"auto.offset.reset": "smallest",
}
kafkaConsumer, err := ckafka.NewConsumer(kafkaConfigMap)
if err != nil {
log.WithFields(fields).WithError(err).Fatal("error creating kafka consumer")
}
defer kafkaConsumer.Close()
log.WithFields(fields).Info("created kafka consumer")
adminClient, err := ckafka.NewAdminClientFromConsumer(kafkaConsumer)
if err != nil {
log.WithFields(fields).WithError(err).Fatal("error creating kafka admin client")
}
defer adminClient.Close()
dur, _ := time.ParseDuration(cfg.MaxDelay)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Ask cluster for the resource's current configuration
results, err := adminClient.DescribeConfigs(ctx,
[]ckafka.ConfigResource{{Type: ckafka.ResourceTopic, Name: cfg.RouteTopic}},
ckafka.SetAdminRequestTimeout(dur))
if err != nil {
log.WithFields(fields).WithError(err).Fatal("error describing topic")
}
if len(results) != 1 {
log.WithFields(fields).Fatal("unexpected number of results returned")
}
if results[0].Error.Code() == ckafka.ErrUnknownTopicOrPart {
log.WithFields(fields).Debug("topic not found, creating")
// Create the topic!
createResults, err := adminClient.CreateTopics(
ctx,
// Multiple topics can be created simultaneously
// by providing more TopicSpecification structs here.
[]ckafka.TopicSpecification{{
Topic: cfg.RouteTopic,
NumPartitions: 4,
ReplicationFactor: 2,
Config: map[string]string{
"cleanup.policy": "compact",
"delete.retention.ms": "100",
"segment.ms": "100",
"min.cleanable.dirty.ratio": "0.01",
},
}},
// Admin options
ckafka.SetAdminOperationTimeout(dur))
if err != nil {
log.WithFields(fields).WithError(err).Fatal("error creating topic")
}
spew.Dump("createResults", createResults)
} else {
log.WithFields(fields).Debug("topic found")
spew.Dump("topic config details", results[0].Config)
}
log.WithFields(fields).Info("all finished")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment