Skip to content

Instantly share code, notes, and snippets.

@khous
Created April 27, 2020 22:49
Show Gist options
  • Save khous/97bc76fb78eca412e7b0ab4bfbf32c07 to your computer and use it in GitHub Desktop.
Save khous/97bc76fb78eca412e7b0ab4bfbf32c07 to your computer and use it in GitHub Desktop.
// TODO extract swoosh to constant
package kafka_pubsub
import (
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
pubsub "github.com/dapr/components-contrib/pubsub"
kafka_pubsub "github.com/dapr/components-contrib/pubsub/kafka"
"math/rand"
"strconv"
"testing"
"time"
//sarama "github.com/Shopify/sarama"
logger "github.com/dapr/dapr/pkg/logger"
"github.com/stretchr/testify/require"
)
func initComponent(t *testing.T) (pubsub.PubSub) {
l := logger.NewLogger("pubsublogger")
l.SetOutputLevel(logger.DebugLevel)
kafkaPubsub := kafka_pubsub.NewKafka(l)
err := kafkaPubsub.Init(pubsub.Metadata{
Properties: map[string]string{
"consumerID": "ding",
"brokers": "localhost:9092",
"authRequired": "false",
},
})
require.NoError(t, err)
return kafkaPubsub
}
// Setup topic in kafka. First delete topic in order to flush it of messages, the create that topic again.
func bootstrapKafka(t *testing.T) {
config := sarama.NewConfig()
config.Version = sarama.V2_3_0_0
client, err := sarama.NewClient([]string{"localhost:9092"}, config)
require.NoError(t, err)
broker := client.Brokers()[0]
broker.Open(config)
if connected, err := broker.Connected(); err != nil || !connected{
require.Fail(t, "broker not connected")
}
deleteRes, err := broker.DeleteTopics(&sarama.DeleteTopicsRequest{
Version: 0,
Topics: []string{"swoosh"},
Timeout: 10 * time.Second,
})
fmt.Printf("Topic Deletion Response: %v\n", deleteRes)
require.NoError(t, err)
time.Sleep(3 * time.Second)
createRes, err := broker.CreateTopics(&sarama.CreateTopicsRequest{
Version: 0,
TopicDetails: map[string]*sarama.TopicDetail{
"swoosh": {
NumPartitions: 1,
ReplicationFactor: 1,
ReplicaAssignment: nil,
ConfigEntries: nil,
},
},
Timeout: 10 * time.Second,
ValidateOnly: false,
})
fmt.Printf("Topic Creation Response: %v\n", createRes)
require.NoError(t, err)
//time.Sleep(3 * time.Second)
}
func subscribeToTestTopic(kafkaPubsub pubsub.PubSub, topic string) (chan *pubsub.NewMessage, error) {
c := make(chan *pubsub.NewMessage, 1)
err := kafkaPubsub.Subscribe(pubsub.SubscribeRequest{Topic: topic}, func(msg *pubsub.NewMessage) error {
fmt.Println("Received a message, sending msg to output channel")
c <- msg
fmt.Println("Output channel has received the message")
//fmt.Println("wg.Done")
//wg.Done()
return nil
})
return c, err
}
func publishTestMessage(kafkaPubsub pubsub.PubSub, message int, topic string) error {
return kafkaPubsub.Publish(&pubsub.PublishRequest{
Data: []byte("{ \"message\": " + strconv.Itoa(message) + "}"),
//Data: []byte("{\"love\": \"games?\"}"),
Topic: topic,
})
}
type pubsubMessage struct {
Message int `json:"message"`
}
// Let me take no arguments and simply fail the test if I fail to send and then receive the same message
func verifySendAndReceiveMessage(t *testing.T, kafkaPubsub pubsub.PubSub, c chan *pubsub.NewMessage, topic string) {
// grab a random integer, and later compare this int to verify that the message we sent is the same one we recv'd
// without doing something like augmenting the public interface for pubsub
message := rand.Int()
go func() {
fmt.Printf("Publishing %v\n", message)
_ = publishTestMessage(kafkaPubsub, message, topic)
fmt.Println("Published")
}()
fmt.Println("Done waiting")
fmt.Println("Waiting for channel dinger")
var msg pubsubMessage
recvdMessage := <-c
json.Unmarshal(recvdMessage.Data, &msg)
require.Equal(t, topic, recvdMessage.Topic)
require.Equal(t, message, msg.Message)
}
func TestKafka(t *testing.T) {
bootstrapKafka(t)
kafkaPubsub := initComponent(t)
fmt.Println("Subscribing")
// Test sanity check, verify that returned offset is 0
// This ensures that our test is running against a clean environment
outputChannel, err := subscribeToTestTopic(kafkaPubsub, "swoosh")
require.NoError(t, err)
outputChannel, err = subscribeToTestTopic(kafkaPubsub, "foof")
require.NoError(t, err)
// Because of the way the function below is structured, messages are guaranteed to be received in order
// We are publishing one message at a time and waiting for its consumption with the output channel
for i := 0; i < 100; i++ {
verifySendAndReceiveMessage(t, kafkaPubsub, outputChannel, "swoosh")
}
verifySendAndReceiveMessage(t, kafkaPubsub, outputChannel, "foof")
//test that our kill routine works
//time.Sleep(100 * time.Second)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment