Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mariusneo/bfa1b3612caab9b482ff10278bebe311 to your computer and use it in GitHub Desktop.
Save mariusneo/bfa1b3612caab9b482ff10278bebe311 to your computer and use it in GitHub Desktop.
Simple wordcount kafka stream topology test on which it is inspected the timestamp of each consumer record passing through the topology
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.TopologyTestDriver
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.kstream.TransformerSupplier
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.hamcrest.MatcherAssert.assertThat
import org.hamcrest.Matchers.equalTo
import org.junit.Test
import java.util.Arrays
import java.util.Locale
import java.util.Properties
/**
* Unit test example using [TopologyTestDriver]
* that makes use of the [TransformerSupplier] for inspecting the consumer record metadata of
* the Apache Kafka records going through the topology.
*/
class TopologyTransformTest {
private val recordFactory = ConsumerRecordFactory(Serdes.String().serializer(), Serdes.String().serializer())
private val props = properties(APPLICATION_ID, "localhost:9092")
@Test
fun test() {
// Create TopologyTestDriver instance
val driver = TopologyTestDriver(
topology(INPUT_TOPIC, OUTPUT_TOPIC), props
)
val inputValues = Arrays.asList("A", "A", "B", "C", "B", "A")
val expectedList = Arrays.asList(
ProducerRecord(OUTPUT_TOPIC, "a", 1L),
ProducerRecord(OUTPUT_TOPIC, "a", 2L),
ProducerRecord(OUTPUT_TOPIC, "b", 1L),
ProducerRecord(OUTPUT_TOPIC, "c", 1L),
ProducerRecord(OUTPUT_TOPIC, "b", 2L),
ProducerRecord(OUTPUT_TOPIC, "a", 3L)
)
// Feed Input
for (inputValue in inputValues) {
driver.pipeInput(recordFactory.create(INPUT_TOPIC, null, inputValue))
}
// Retrieve Output
for (expected in expectedList) {
val actual = driver.readOutput(OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.Long().deserializer())
assertThat(actual.key(), equalTo(expected.key()))
}
}
fun topology(inputTopic: String, outputTopic: String): Topology {
val builder = StreamsBuilder()
val source = inspectRecordMetadata(builder.stream<String, String>(inputTopic))
val counts = source
.flatMapValues { value ->
value?.toLowerCase(Locale.getDefault())
?.split(" ".toRegex())
?.dropLastWhile { it.isEmpty() }
?.toList()
}.groupBy { _, value -> value }
.count()
// need to override value serde to Long type
counts.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()))
return builder.build()
}
private fun inspectRecordMetadata(source: KStream<String?, String?>): KStream<String?, String?>{
return source
.transform(TransformerSupplier {
object : Transformer<Any, Any, KeyValue<String?, String?>> {
var processorContext: ProcessorContext? = null
override fun init(context: ProcessorContext?) {
this.processorContext = context
}
override fun transform(key: Any?, value: Any?): KeyValue<String?, String?>? {
println("The entry with the value $value was written at the timestamp ${processorContext?.timestamp()} in Apache Kafka")
return KeyValue(key as String?, value as String?)
}
override fun close() {
}
}
})
}
fun properties(appId: String, bootstrap: String): Properties {
val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = appId
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrap
props[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
return props
}
companion object {
private val APPLICATION_ID = "test-app-id"
private val INPUT_TOPIC = "inputTopic"
private val OUTPUT_TOPIC = "outputTopic"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment