Skip to content

Instantly share code, notes, and snippets.

@hrafzali
Last active December 1, 2016 13:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hrafzali/c2f50e7b957030dab13693eec1e49c13 to your computer and use it in GitHub Desktop.
Save hrafzali/c2f50e7b957030dab13693eec1e49c13 to your computer and use it in GitHub Desktop.
A sample code for reproducing the kafka ProcessorTopologyTestDriver issue with .map and .groupByKey.count
import java.util.{Properties, UUID}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.{KeyValue, StreamsConfig}
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.apache.kafka.test.ProcessorTopologyTestDriver
// When using ProcessorTopologyTestDriver the topology does not produce any results.
object ProcessorTopology {
def main(args: Array[String]): Unit = {
val inputTopic = "input"
val stateStore = "count"
val inputs = Seq[(String, Integer)](("A-1", 1), ("A-2", 2), ("B-1", 3))
val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "0")
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG , "0")
val builder = new KStreamBuilder
builder.stream(Serdes.String, Serdes.Integer, inputTopic)
.map((k, v) => new KeyValue(k.split("-")(0), v))
.groupByKey(Serdes.String, Serdes.Integer)
.count(stateStore)
val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), builder, stateStore)
inputs.foreach {
case (key, value) => driver.process(inputTopic, key, value, Serdes.String.serializer, Serdes.Integer.serializer)
}
driver.getKeyValueStore(stateStore).all().forEachRemaining(println)
}
}
import java.util.{Properties, UUID}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.{KeyValue, StreamsConfig}
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
import org.apache.kafka.test.KStreamTestDriver
// When using KStreamTestDriver the topology produces the expected results.
object ProcessorTopology2 {
def main(args: Array[String]): Unit = {
val inputTopic = "input"
val stateStore = "count"
val inputs = Seq[(String, Integer)](("A-1", 1), ("A-2", 2), ("B-1", 3))
val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "0")
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG , "0")
val builder = new KStreamBuilder
builder.stream(Serdes.String, Serdes.Integer, inputTopic)
.map((k, v) => new KeyValue(k.split("-")(0), v))
.groupByKey(Serdes.String, Serdes.Integer)
.count(stateStore)
val driver = new KStreamTestDriver(builder, new File("./db"))
inputs.foreach { case (key, value) => driver.process(inputTopic, key, value)}
driver.flushState()
driver.context().getStateStore(stateStore).asInstanceOf[ReadOnlyKeyValueStore[String, Integer]].all().forEachRemaining(println)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment