Created
April 22, 2020 22:11
-
-
Save khous/6c9be7a796fe7576780fd134d817a88e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// TODO extract swoosh to constant | |
package kafka_pubsub | |
import ( | |
"encoding/json" | |
"fmt" | |
"github.com/Shopify/sarama" | |
pubsub "github.com/dapr/components-contrib/pubsub" | |
kafka_pubsub "github.com/dapr/components-contrib/pubsub/kafka" | |
"math/rand" | |
"strconv" | |
"testing" | |
"time" | |
//sarama "github.com/Shopify/sarama" | |
logger "github.com/dapr/dapr/pkg/logger" | |
"github.com/stretchr/testify/require" | |
) | |
func initComponent(t *testing.T) (pubsub.PubSub) { | |
kafkaPubsub := kafka_pubsub.NewKafka(logger.NewLogger("pubsublogger")) | |
err := kafkaPubsub.Init(pubsub.Metadata{ | |
Properties: map[string]string{ | |
"consumerID": "ding", | |
"brokers": "localhost:9092", | |
"authRequired": "false", | |
}, | |
}) | |
require.NoError(t, err) | |
return kafkaPubsub | |
} | |
// Setup topic in kafka. First delete topic in order to flush it of messages, the create that topic again. | |
func bootstrapKafka(t *testing.T) { | |
config := sarama.NewConfig() | |
config.Version = sarama.V2_3_0_0 | |
client, err := sarama.NewClient([]string{"localhost:9092"}, config) | |
require.NoError(t, err) | |
broker := client.Brokers()[0] | |
broker.Open(config) | |
if connected, err := broker.Connected(); err != nil || !connected{ | |
require.Fail(t, "broker not connected") | |
} | |
deleteRes, err := broker.DeleteTopics(&sarama.DeleteTopicsRequest{ | |
Version: 0, | |
Topics: []string{"swoosh"}, | |
Timeout: 10 * time.Second, | |
}) | |
fmt.Printf("Topic Deletion Response: %v\n", deleteRes) | |
require.NoError(t, err) | |
time.Sleep(3 * time.Second) | |
createRes, err := broker.CreateTopics(&sarama.CreateTopicsRequest{ | |
Version: 0, | |
TopicDetails: map[string]*sarama.TopicDetail{ | |
"swoosh": { | |
NumPartitions: 1, | |
ReplicationFactor: 1, | |
ReplicaAssignment: nil, | |
ConfigEntries: nil, | |
}, | |
}, | |
Timeout: 10 * time.Second, | |
ValidateOnly: false, | |
}) | |
fmt.Printf("Topic Creation Response: %v\n", createRes) | |
require.NoError(t, err) | |
//time.Sleep(3 * time.Second) | |
} | |
func subscribeToTestTopic(kafkaPubsub pubsub.PubSub, topic string) (chan *pubsub.NewMessage, error) { | |
c := make(chan *pubsub.NewMessage, 1) | |
err := kafkaPubsub.Subscribe(pubsub.SubscribeRequest{Topic: topic}, func(msg *pubsub.NewMessage) error { | |
c <- msg | |
//fmt.Println("wg.Done") | |
//wg.Done() | |
return nil | |
}) | |
return c, err | |
} | |
func publishTestMessage(kafkaPubsub pubsub.PubSub, message int) error { | |
return kafkaPubsub.Publish(&pubsub.PublishRequest{ | |
Data: []byte("{ \"message\": " + strconv.Itoa(message) + "}"), | |
//Data: []byte("{\"love\": \"games?\"}"), | |
Topic: "swoosh", | |
}) | |
} | |
type pubsubMessage struct { | |
Message int `json:"message"` | |
} | |
// Let me take no arguments and simply fail the test if I fail to send and then receive the same message | |
func verifySendAndReceiveMessage(t *testing.T, kafkaPubsub pubsub.PubSub, c chan *pubsub.NewMessage) { | |
// grab a random integer, and later compare this int to verify that the message we sent is the same one we recv'd | |
// without doing something like augmenting the public interface for pubsub | |
message := rand.Int() | |
go func() { | |
fmt.Printf("Publishing %v\n", message) | |
_ = publishTestMessage(kafkaPubsub, message) | |
fmt.Println("Published") | |
}() | |
fmt.Println("Done waiting") | |
fmt.Println("Waiting for channel dinger") | |
var msg pubsubMessage | |
recvdMessage := <-c | |
json.Unmarshal(recvdMessage.Data, &msg) | |
require.Equal(t, "swoosh", recvdMessage.Topic) | |
require.Equal(t, message, msg.Message) | |
} | |
func TestKafka(t *testing.T) { | |
bootstrapKafka(t) | |
kafkaPubsub := initComponent(t) | |
fmt.Println("Subscribing") | |
// Test sanity check, verify that returned offset is 0 | |
// This ensures that our test is running against a clean environment | |
outputChannel, err := subscribeToTestTopic(kafkaPubsub, "swoosh") | |
require.NoError(t, err) | |
_, err = subscribeToTestTopic(kafkaPubsub, "foof") | |
require.NoError(t, err) | |
// Because of the way the function below is structured, messages are guaranteed to be received in order | |
// We are publishing one message at a time and waiting for its consumption with the output channel | |
for i := 0; i < 100; i++ { | |
verifySendAndReceiveMessage(t, kafkaPubsub, outputChannel) | |
} | |
//test that our kill routine works | |
//time.Sleep(100 * time.Second) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment