Created
April 27, 2020 22:49
-
-
Save khous/97bc76fb78eca412e7b0ab4bfbf32c07 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// TODO extract swoosh to constant | |
package kafka_pubsub | |
import ( | |
"encoding/json" | |
"fmt" | |
"github.com/Shopify/sarama" | |
pubsub "github.com/dapr/components-contrib/pubsub" | |
kafka_pubsub "github.com/dapr/components-contrib/pubsub/kafka" | |
"math/rand" | |
"strconv" | |
"testing" | |
"time" | |
//sarama "github.com/Shopify/sarama" | |
logger "github.com/dapr/dapr/pkg/logger" | |
"github.com/stretchr/testify/require" | |
) | |
func initComponent(t *testing.T) (pubsub.PubSub) { | |
l := logger.NewLogger("pubsublogger") | |
l.SetOutputLevel(logger.DebugLevel) | |
kafkaPubsub := kafka_pubsub.NewKafka(l) | |
err := kafkaPubsub.Init(pubsub.Metadata{ | |
Properties: map[string]string{ | |
"consumerID": "ding", | |
"brokers": "localhost:9092", | |
"authRequired": "false", | |
}, | |
}) | |
require.NoError(t, err) | |
return kafkaPubsub | |
} | |
// Setup topic in kafka. First delete topic in order to flush it of messages, the create that topic again. | |
func bootstrapKafka(t *testing.T) { | |
config := sarama.NewConfig() | |
config.Version = sarama.V2_3_0_0 | |
client, err := sarama.NewClient([]string{"localhost:9092"}, config) | |
require.NoError(t, err) | |
broker := client.Brokers()[0] | |
broker.Open(config) | |
if connected, err := broker.Connected(); err != nil || !connected{ | |
require.Fail(t, "broker not connected") | |
} | |
deleteRes, err := broker.DeleteTopics(&sarama.DeleteTopicsRequest{ | |
Version: 0, | |
Topics: []string{"swoosh"}, | |
Timeout: 10 * time.Second, | |
}) | |
fmt.Printf("Topic Deletion Response: %v\n", deleteRes) | |
require.NoError(t, err) | |
time.Sleep(3 * time.Second) | |
createRes, err := broker.CreateTopics(&sarama.CreateTopicsRequest{ | |
Version: 0, | |
TopicDetails: map[string]*sarama.TopicDetail{ | |
"swoosh": { | |
NumPartitions: 1, | |
ReplicationFactor: 1, | |
ReplicaAssignment: nil, | |
ConfigEntries: nil, | |
}, | |
}, | |
Timeout: 10 * time.Second, | |
ValidateOnly: false, | |
}) | |
fmt.Printf("Topic Creation Response: %v\n", createRes) | |
require.NoError(t, err) | |
//time.Sleep(3 * time.Second) | |
} | |
func subscribeToTestTopic(kafkaPubsub pubsub.PubSub, topic string) (chan *pubsub.NewMessage, error) { | |
c := make(chan *pubsub.NewMessage, 1) | |
err := kafkaPubsub.Subscribe(pubsub.SubscribeRequest{Topic: topic}, func(msg *pubsub.NewMessage) error { | |
fmt.Println("Received a message, sending msg to output channel") | |
c <- msg | |
fmt.Println("Output channel has received the message") | |
//fmt.Println("wg.Done") | |
//wg.Done() | |
return nil | |
}) | |
return c, err | |
} | |
func publishTestMessage(kafkaPubsub pubsub.PubSub, message int, topic string) error { | |
return kafkaPubsub.Publish(&pubsub.PublishRequest{ | |
Data: []byte("{ \"message\": " + strconv.Itoa(message) + "}"), | |
//Data: []byte("{\"love\": \"games?\"}"), | |
Topic: topic, | |
}) | |
} | |
type pubsubMessage struct { | |
Message int `json:"message"` | |
} | |
// Let me take no arguments and simply fail the test if I fail to send and then receive the same message | |
func verifySendAndReceiveMessage(t *testing.T, kafkaPubsub pubsub.PubSub, c chan *pubsub.NewMessage, topic string) { | |
// grab a random integer, and later compare this int to verify that the message we sent is the same one we recv'd | |
// without doing something like augmenting the public interface for pubsub | |
message := rand.Int() | |
go func() { | |
fmt.Printf("Publishing %v\n", message) | |
_ = publishTestMessage(kafkaPubsub, message, topic) | |
fmt.Println("Published") | |
}() | |
fmt.Println("Done waiting") | |
fmt.Println("Waiting for channel dinger") | |
var msg pubsubMessage | |
recvdMessage := <-c | |
json.Unmarshal(recvdMessage.Data, &msg) | |
require.Equal(t, topic, recvdMessage.Topic) | |
require.Equal(t, message, msg.Message) | |
} | |
func TestKafka(t *testing.T) { | |
bootstrapKafka(t) | |
kafkaPubsub := initComponent(t) | |
fmt.Println("Subscribing") | |
// Test sanity check, verify that returned offset is 0 | |
// This ensures that our test is running against a clean environment | |
outputChannel, err := subscribeToTestTopic(kafkaPubsub, "swoosh") | |
require.NoError(t, err) | |
outputChannel, err = subscribeToTestTopic(kafkaPubsub, "foof") | |
require.NoError(t, err) | |
// Because of the way the function below is structured, messages are guaranteed to be received in order | |
// We are publishing one message at a time and waiting for its consumption with the output channel | |
for i := 0; i < 100; i++ { | |
verifySendAndReceiveMessage(t, kafkaPubsub, outputChannel, "swoosh") | |
} | |
verifySendAndReceiveMessage(t, kafkaPubsub, outputChannel, "foof") | |
//test that our kill routine works | |
//time.Sleep(100 * time.Second) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment