Last active
June 6, 2023 13:29
-
-
Save krsoninikhil/3ed02b79d7614dac450dcffc44fa979e to your computer and use it in GitHub Desktop.
Kafka client to create topic and ACLs for a SCRAM user using IAM auth on publicly accessible MSK
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"crypto/tls" | |
"crypto/x509" | |
"fmt" | |
"log" | |
"time" | |
"github.com/aws/aws-sdk-go/aws/credentials" | |
sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4" | |
"github.com/pkg/errors" | |
"github.com/segmentio/kafka-go" | |
"github.com/segmentio/kafka-go/sasl/aws_msk_iam" | |
) | |
func main() { | |
ctx := context.Background() | |
topic := "test_topic" | |
client := Client(ctx, topic) | |
CreateTopic(ctx, *client, topic, 1, 1) | |
CreateACL(ctx, *client, topic, "testuser") | |
} | |
func Client(ctx context.Context, topic string) { | |
brokers := []string{ | |
"b-1-public.ssupexternal.wulvz8.c4.kafka.ap-south-1.amazonaws.com:9198", | |
"b-2-public.ssupexternal.wulvz8.c4.kafka.ap-south-1.amazonaws.com:9198", | |
} | |
accessKeyId, secretAccessKey := os.Getenv("AWS_ACCESS_KEY"), os.Getenv("AWS_SECRET_KEY") | |
creds := credentials.NewStaticCredentials(accessKeyId, secretAccessKey, "") | |
mechanism := &aws_msk_iam.Mechanism{Region: "ap-south-1", Signer: sigv4.NewSigner(creds)} | |
sharedTransport := &kafka.Transport{SASL: mechanism, TLS: getTLSConfig(), DialTimeout: 120 * time.Second} | |
client := &kafka.Client{ | |
Addr: kafka.TCP(brokers...), | |
Timeout: 10 * time.Second, | |
Transport: sharedTransport, | |
} | |
return client | |
} | |
func CreateTopic(ctx context.Context, client kafka.Client, topic string, partitions int, replication int) error { | |
res, err := client.CreateTopics(ctx, &kafka.CreateTopicsRequest{ | |
Topics: []kafka.TopicConfig{ | |
{ | |
Topic: topic, | |
NumPartitions: partitions, | |
ReplicationFactor: replication, | |
}, | |
}, | |
}) | |
if err != nil || res.Errors[topic] != nil { | |
if err == nil { | |
err = res.Errors[topic] | |
} | |
return errors.Wrap(err, "Could not create topic") | |
} | |
return nil | |
} | |
func CreateACL(ctx context.Context, client kafka.Client, topic string, username string) error { | |
res, err := client.CreateACLs(ctx, &kafka.CreateACLsRequest{ | |
ACLs: []kafka.ACLEntry{ | |
newACL(username, topic, kafka.ACLOperationTypeRead), | |
newACL(username, topic, kafka.ACLOperationTypeWrite), | |
}, | |
}) | |
if err != nil || isError(res.Errors) { | |
if err == nil { | |
err = res.Errors[0] | |
} | |
return errors.Wrap(err, "error creating ACL") | |
} | |
return nil | |
} | |
func newACL(user string, topic string, op kafka.ACLOperationType) kafka.ACLEntry { | |
return kafka.ACLEntry{ | |
ResourceType: kafka.ResourceTypeTopic, | |
ResourceName: topic, | |
ResourcePatternType: kafka.PatternTypeLiteral, | |
Principal: fmt.Sprintf("User:%s", user), | |
Host: "*", | |
Operation: op, | |
PermissionType: kafka.ACLPermissionTypeAllow, | |
} | |
} | |
func getTLSConfig() *tls.Config { | |
caPEM := `-----BEGIN CERTIFICATE----- | |
MIIEkjCCA3qgAwIBAgITBn+USionzfP6wq4rAfkI7rnExjANBgkqhkiG9w0BAQsF | |
ADCBmDELMAkGA1UEBhMCVVMxEDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNj | |
b3R0c2RhbGUxJTAjBgNVBAoTHFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4x | |
OzA5BgNVBAMTMlN0YXJmaWVsZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1 | |
dGhvcml0eSAtIEcyMB4XDTE1MDUyNTEyMDAwMFoXDTM3MTIzMTAxMDAwMFowOTEL | |
MAkGA1UEBhMCVVMxDzANBgNVBAoTBkFtYXpvbjEZMBcGA1UEAxMQQW1hem9uIFJv | |
b3QgQ0EgMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALJ4gHHKeNXj | |
ca9HgFB0fW7Y14h29Jlo91ghYPl0hAEvrAIthtOgQ3pOsqTQNroBvo3bSMgHFzZM | |
9O6II8c+6zf1tRn4SWiw3te5djgdYZ6k/oI2peVKVuRF4fn9tBb6dNqcmzU5L/qw | |
IFAGbHrQgLKm+a/sRxmPUDgH3KKHOVj4utWp+UhnMJbulHheb4mjUcAwhmahRWa6 | |
VOujw5H5SNz/0egwLX0tdHA114gk957EWW67c4cX8jJGKLhD+rcdqsq08p8kDi1L | |
93FcXmn/6pUCyziKrlA4b9v7LWIbxcceVOF34GfID5yHI9Y/QCB/IIDEgEw+OyQm | |
jgSubJrIqg0CAwEAAaOCATEwggEtMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/ | |
BAQDAgGGMB0GA1UdDgQWBBSEGMyFNOy8DJSULghZnMeyEE4KCDAfBgNVHSMEGDAW | |
gBScXwDfqgHXMCs4iKK4bUqc8hGRgzB4BggrBgEFBQcBAQRsMGowLgYIKwYBBQUH | |
MAGGImh0dHA6Ly9vY3NwLnJvb3RnMi5hbWF6b250cnVzdC5jb20wOAYIKwYBBQUH | |
MAKGLGh0dHA6Ly9jcnQucm9vdGcyLmFtYXpvbnRydXN0LmNvbS9yb290ZzIuY2Vy | |
MD0GA1UdHwQ2MDQwMqAwoC6GLGh0dHA6Ly9jcmwucm9vdGcyLmFtYXpvbnRydXN0 | |
LmNvbS9yb290ZzIuY3JsMBEGA1UdIAQKMAgwBgYEVR0gADANBgkqhkiG9w0BAQsF | |
AAOCAQEAYjdCXLwQtT6LLOkMm2xF4gcAevnFWAu5CIw+7bMlPLVvUOTNNWqnkzSW | |
MiGpSESrnO09tKpzbeR/FoCJbM8oAxiDR3mjEH4wW6w7sGDgd9QIpuEdfF7Au/ma | |
eyKdpwAJfqxGF4PcnCZXmTA5YpaP7dreqsXMGz7KQ2hsVxa81Q4gLv7/wmpdLqBK | |
bRRYh5TmOTFffHPLkIhqhBGWJ6bt2YFGpn6jcgAKUj6DiAdjd4lpFw85hdKrCEVN | |
0FE6/V1dN2RMfjCyVSRCnTawXZwXgWHxyvkQAiSr6w10kY17RSlQOYiypok1JR4U | |
akcjMS9cmvqtmg5iUaQqqcT5NJ0hGA== | |
-----END CERTIFICATE-----` | |
caCertPool := x509.NewCertPool() | |
if ok := caCertPool.AppendCertsFromPEM([]byte(caPEM)); !ok { | |
log.Fatalln("Could not append cert to pool") | |
} | |
return &tls.Config{ | |
RootCAs: caCertPool, | |
} | |
} | |
func getAuthMechanism(awsConf Config) sasl.Mechanism { | |
creds := credentials.NewStaticCredentials(awsConf.AccessKeyID, awsConf.SecretAccessKey, "") | |
return &aws_msk_iam.Mechanism{ | |
Region: awsConf.Region, | |
Signer: sigv4.NewSigner(creds), | |
} | |
} | |
func isError(errs []error) bool { | |
for _, e := range errs { | |
if e != nil { | |
return true | |
} | |
} | |
return false | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment