Skip to content

Instantly share code, notes, and snippets.

@krsoninikhil
Last active June 6, 2023 13:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krsoninikhil/3ed02b79d7614dac450dcffc44fa979e to your computer and use it in GitHub Desktop.
Save krsoninikhil/3ed02b79d7614dac450dcffc44fa979e to your computer and use it in GitHub Desktop.
Kafka client to create topic and ACLs for a SCRAM user using IAM auth on publicly accessible MSK
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"time"
"github.com/aws/aws-sdk-go/aws/credentials"
sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/pkg/errors"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam"
)
func main() {
ctx := context.Background()
topic := "test_topic"
client := Client(ctx, topic)
CreateTopic(ctx, *client, topic, 1, 1)
CreateACL(ctx, *client, topic, "testuser")
}
func Client(ctx context.Context, topic string) {
brokers := []string{
"b-1-public.ssupexternal.wulvz8.c4.kafka.ap-south-1.amazonaws.com:9198",
"b-2-public.ssupexternal.wulvz8.c4.kafka.ap-south-1.amazonaws.com:9198",
}
accessKeyId, secretAccessKey := os.Getenv("AWS_ACCESS_KEY"), os.Getenv("AWS_SECRET_KEY")
creds := credentials.NewStaticCredentials(accessKeyId, secretAccessKey, "")
mechanism := &aws_msk_iam.Mechanism{Region: "ap-south-1", Signer: sigv4.NewSigner(creds)}
sharedTransport := &kafka.Transport{SASL: mechanism, TLS: getTLSConfig(), DialTimeout: 120 * time.Second}
client := &kafka.Client{
Addr: kafka.TCP(brokers...),
Timeout: 10 * time.Second,
Transport: sharedTransport,
}
return client
}
func CreateTopic(ctx context.Context, client kafka.Client, topic string, partitions int, replication int) error {
res, err := client.CreateTopics(ctx, &kafka.CreateTopicsRequest{
Topics: []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: partitions,
ReplicationFactor: replication,
},
},
})
if err != nil || res.Errors[topic] != nil {
if err == nil {
err = res.Errors[topic]
}
return errors.Wrap(err, "Could not create topic")
}
return nil
}
func CreateACL(ctx context.Context, client kafka.Client, topic string, username string) error {
res, err := client.CreateACLs(ctx, &kafka.CreateACLsRequest{
ACLs: []kafka.ACLEntry{
newACL(username, topic, kafka.ACLOperationTypeRead),
newACL(username, topic, kafka.ACLOperationTypeWrite),
},
})
if err != nil || isError(res.Errors) {
if err == nil {
err = res.Errors[0]
}
return errors.Wrap(err, "error creating ACL")
}
return nil
}
func newACL(user string, topic string, op kafka.ACLOperationType) kafka.ACLEntry {
return kafka.ACLEntry{
ResourceType: kafka.ResourceTypeTopic,
ResourceName: topic,
ResourcePatternType: kafka.PatternTypeLiteral,
Principal: fmt.Sprintf("User:%s", user),
Host: "*",
Operation: op,
PermissionType: kafka.ACLPermissionTypeAllow,
}
}
func getTLSConfig() *tls.Config {
caPEM := `-----BEGIN CERTIFICATE-----
MIIEkjCCA3qgAwIBAgITBn+USionzfP6wq4rAfkI7rnExjANBgkqhkiG9w0BAQsF
ADCBmDELMAkGA1UEBhMCVVMxEDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNj
b3R0c2RhbGUxJTAjBgNVBAoTHFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4x
OzA5BgNVBAMTMlN0YXJmaWVsZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1
dGhvcml0eSAtIEcyMB4XDTE1MDUyNTEyMDAwMFoXDTM3MTIzMTAxMDAwMFowOTEL
MAkGA1UEBhMCVVMxDzANBgNVBAoTBkFtYXpvbjEZMBcGA1UEAxMQQW1hem9uIFJv
b3QgQ0EgMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALJ4gHHKeNXj
ca9HgFB0fW7Y14h29Jlo91ghYPl0hAEvrAIthtOgQ3pOsqTQNroBvo3bSMgHFzZM
9O6II8c+6zf1tRn4SWiw3te5djgdYZ6k/oI2peVKVuRF4fn9tBb6dNqcmzU5L/qw
IFAGbHrQgLKm+a/sRxmPUDgH3KKHOVj4utWp+UhnMJbulHheb4mjUcAwhmahRWa6
VOujw5H5SNz/0egwLX0tdHA114gk957EWW67c4cX8jJGKLhD+rcdqsq08p8kDi1L
93FcXmn/6pUCyziKrlA4b9v7LWIbxcceVOF34GfID5yHI9Y/QCB/IIDEgEw+OyQm
jgSubJrIqg0CAwEAAaOCATEwggEtMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/
BAQDAgGGMB0GA1UdDgQWBBSEGMyFNOy8DJSULghZnMeyEE4KCDAfBgNVHSMEGDAW
gBScXwDfqgHXMCs4iKK4bUqc8hGRgzB4BggrBgEFBQcBAQRsMGowLgYIKwYBBQUH
MAGGImh0dHA6Ly9vY3NwLnJvb3RnMi5hbWF6b250cnVzdC5jb20wOAYIKwYBBQUH
MAKGLGh0dHA6Ly9jcnQucm9vdGcyLmFtYXpvbnRydXN0LmNvbS9yb290ZzIuY2Vy
MD0GA1UdHwQ2MDQwMqAwoC6GLGh0dHA6Ly9jcmwucm9vdGcyLmFtYXpvbnRydXN0
LmNvbS9yb290ZzIuY3JsMBEGA1UdIAQKMAgwBgYEVR0gADANBgkqhkiG9w0BAQsF
AAOCAQEAYjdCXLwQtT6LLOkMm2xF4gcAevnFWAu5CIw+7bMlPLVvUOTNNWqnkzSW
MiGpSESrnO09tKpzbeR/FoCJbM8oAxiDR3mjEH4wW6w7sGDgd9QIpuEdfF7Au/ma
eyKdpwAJfqxGF4PcnCZXmTA5YpaP7dreqsXMGz7KQ2hsVxa81Q4gLv7/wmpdLqBK
bRRYh5TmOTFffHPLkIhqhBGWJ6bt2YFGpn6jcgAKUj6DiAdjd4lpFw85hdKrCEVN
0FE6/V1dN2RMfjCyVSRCnTawXZwXgWHxyvkQAiSr6w10kY17RSlQOYiypok1JR4U
akcjMS9cmvqtmg5iUaQqqcT5NJ0hGA==
-----END CERTIFICATE-----`
caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM([]byte(caPEM)); !ok {
log.Fatalln("Could not append cert to pool")
}
return &tls.Config{
RootCAs: caCertPool,
}
}
func getAuthMechanism(awsConf Config) sasl.Mechanism {
creds := credentials.NewStaticCredentials(awsConf.AccessKeyID, awsConf.SecretAccessKey, "")
return &aws_msk_iam.Mechanism{
Region: awsConf.Region,
Signer: sigv4.NewSigner(creds),
}
}
func isError(errs []error) bool {
for _, e := range errs {
if e != nil {
return true
}
}
return false
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment