Skip to content

Instantly share code, notes, and snippets.

@mariusneo
Created August 6, 2019 04:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mariusneo/b7a09f68b423cca86c7b610fa36fdb9c to your computer and use it in GitHub Desktop.
Save mariusneo/b7a09f68b423cca86c7b610fa36fdb9c to your computer and use it in GitHub Desktop.
Simple Kafka Streams Word Count Topology Test written in Kotlin
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.hamcrest.MatcherAssert.assertThat
import org.hamcrest.Matchers.equalTo
import org.junit.Test
import java.util.Arrays
import java.util.Locale
import java.util.Properties
/**
* Unit test example using [TopologyTestDriver].
*/
class TopologyTest {
private val recordFactory = ConsumerRecordFactory(Serdes.String().serializer(), Serdes.String().serializer())
private val props = properties(APPLICATION_ID, "localhost:9092")
@Test
fun test() {
// Create TopologyTestDriver instance
val driver = TopologyTestDriver(
topology(INPUT_TOPIC, OUTPUT_TOPIC), props
)
val inputValues = Arrays.asList("A", "A", "B", "C", "B", "A")
val expectedList = Arrays.asList(
ProducerRecord(OUTPUT_TOPIC, "a", 1L),
ProducerRecord(OUTPUT_TOPIC, "a", 2L),
ProducerRecord(OUTPUT_TOPIC, "b", 1L),
ProducerRecord(OUTPUT_TOPIC, "c", 1L),
ProducerRecord(OUTPUT_TOPIC, "b", 2L),
ProducerRecord(OUTPUT_TOPIC, "a", 3L)
)
// Feed Input
for (inputValue in inputValues) {
driver.pipeInput(recordFactory.create(INPUT_TOPIC, null, inputValue))
}
// Retrieve Output
for (expected in expectedList) {
val actual = driver.readOutput(OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.Long().deserializer())
assertThat(actual.key(), equalTo(expected.key()))
}
}
fun topology(inputTopic: String, outputTopic: String): Topology {
val builder = StreamsBuilder()
val source = builder.stream<String, String>(inputTopic)
val counts = source
.flatMapValues { value ->
value.toLowerCase(Locale.getDefault())
.split(" ".toRegex())
.dropLastWhile { it.isEmpty() }
.toList()
}.groupBy { key, value -> value }
.count()
// need to override value serde to Long type
counts.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()))
return builder.build()
}
fun properties(appId: String, bootstrap: String): Properties {
val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = appId
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrap
props[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
return props
}
companion object {
private val APPLICATION_ID = "test-app-id"
private val INPUT_TOPIC = "inputTopic"
private val OUTPUT_TOPIC = "outputTopic"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment