-
-
Save hrafzali/c2f50e7b957030dab13693eec1e49c13 to your computer and use it in GitHub Desktop.
A sample code for reproducing the kafka ProcessorTopologyTestDriver issue with .map and .groupByKey.count
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.{Properties, UUID} | |
import org.apache.kafka.common.serialization.Serdes | |
import org.apache.kafka.streams.{KeyValue, StreamsConfig} | |
import org.apache.kafka.streams.kstream.KStreamBuilder | |
import org.apache.kafka.test.ProcessorTopologyTestDriver | |
// When using ProcessorTopologyTestDriver the topology does not produce any results. | |
object ProcessorTopology { | |
def main(args: Array[String]): Unit = { | |
val inputTopic = "input" | |
val stateStore = "count" | |
val inputs = Seq[(String, Integer)](("A-1", 1), ("A-2", 2), ("B-1", 3)) | |
val props = new Properties | |
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString) | |
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") | |
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "0") | |
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG , "0") | |
val builder = new KStreamBuilder | |
builder.stream(Serdes.String, Serdes.Integer, inputTopic) | |
.map((k, v) => new KeyValue(k.split("-")(0), v)) | |
.groupByKey(Serdes.String, Serdes.Integer) | |
.count(stateStore) | |
val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), builder, stateStore) | |
inputs.foreach { | |
case (key, value) => driver.process(inputTopic, key, value, Serdes.String.serializer, Serdes.Integer.serializer) | |
} | |
driver.getKeyValueStore(stateStore).all().forEachRemaining(println) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.{Properties, UUID} | |
import org.apache.kafka.common.serialization.Serdes | |
import org.apache.kafka.streams.{KeyValue, StreamsConfig} | |
import org.apache.kafka.streams.kstream.KStreamBuilder | |
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore | |
import org.apache.kafka.test.KStreamTestDriver | |
// When using KStreamTestDriver the topology produces the expected results. | |
object ProcessorTopology2 { | |
def main(args: Array[String]): Unit = { | |
val inputTopic = "input" | |
val stateStore = "count" | |
val inputs = Seq[(String, Integer)](("A-1", 1), ("A-2", 2), ("B-1", 3)) | |
val props = new Properties | |
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString) | |
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") | |
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "0") | |
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG , "0") | |
val builder = new KStreamBuilder | |
builder.stream(Serdes.String, Serdes.Integer, inputTopic) | |
.map((k, v) => new KeyValue(k.split("-")(0), v)) | |
.groupByKey(Serdes.String, Serdes.Integer) | |
.count(stateStore) | |
val driver = new KStreamTestDriver(builder, new File("./db")) | |
inputs.foreach { case (key, value) => driver.process(inputTopic, key, value)} | |
driver.flushState() | |
driver.context().getStateStore(stateStore).asInstanceOf[ReadOnlyKeyValueStore[String, Integer]].all().forEachRemaining(println) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment