Skip to content

Instantly share code, notes, and snippets.

@owulveryck
Created October 17, 2019 15:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save owulveryck/894ad063441cebce8dfe78137ec2383e to your computer and use it in GitHub Desktop.
Save owulveryck/894ad063441cebce8dfe78137ec2383e to your computer and use it in GitHub Desktop.
kafka connect from Go with certificates
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"os"
"testing"
"github.com/segmentio/kafka-go"
)
var (
dialer *kafka.Dialer
broker string = "kafka-server:9002"
topic string = "demo-topic"
)
// TestMain to do the plumbing
func TestMain(m *testing.M) {
cer, err := tls.LoadX509KeyPair("/home/ubuntu/.kafka/access.cert", "/home/ubuntu/.kafka/access.key")
if err != nil {
log.Fatal(err)
}
ca, err := ioutil.ReadFile("/home/ubuntu/.kafka/ca.pem")
if err != nil {
log.Fatal(err)
}
rootCAs := x509.NewCertPool()
ok := rootCAs.AppendCertsFromPEM(ca)
if !ok {
log.Fatal("Cannot append ca")
}
dialer = &kafka.Dialer{
ClientID: "go-bastion",
TLS: &tls.Config{
Certificates: []tls.Certificate{cer},
RootCAs: rootCAs,
},
}
os.Exit(m.Run())
}
func TestProducer(t *testing.T) {
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{broker},
Topic: topic,
Dialer: dialer,
})
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
)
if err != nil {
t.Fatal(err)
}
w.Close()
}
func TestConsumer(t *testing.T) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{broker},
Topic: topic,
Dialer: dialer,
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
r.SetOffset(42)
//for {
m, err := r.ReadMessage(context.Background())
if err != nil {
t.Fatal(err)
//break
}
t.Logf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
//}
r.Close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment