Skip to content

Instantly share code, notes, and snippets.

@davideicardi
Last active September 30, 2020 19:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davideicardi/a46b2cb0fd078125ec6684d32c73566d to your computer and use it in GitHub Desktop.
Save davideicardi/a46b2cb0fd078125ec6684d32c73566d to your computer and use it in GitHub Desktop.
Simple Kafka Streams Testing
val kafkaVersion = "2.4.0"
val scalaTestVersion = "3.2.2"
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion,
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
"org.apache.kafka" % "kafka-streams-test-utils" % kafkaVersion % "it,test",
"org.scalatest" %% "scalatest-funspec" % scalaTestVersion % "it,test",
"org.scalatest" %% "scalatest-shouldmatchers" % scalaTestVersion % "it,test",
)
import java.util.Properties
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{StreamsConfig, Topology}
import Serdes._
class SimpleTopology(
val bootstrapServers: String,
val inputTopic: String,
val outputTopic: String,
val applicationId: String = "SimpleTopology",
) {
val properties = new Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
def createTopology(): Topology = {
val streamBuilder = new StreamsBuilder
val stream: KStream[String, String] =
streamBuilder.stream(commandsTopic)
stream
.mapValues((_, v) => v.toUpperCase())
.to(snapshotTopic)
streamBuilder.build()
}
}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.streams.TopologyTestDriver
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import scala.jdk.CollectionConverters._
class SimpleTopologySpec extends AnyFunSpec with Matchers {
private val stringSer = new StringSerializer()
private val stringDes = new StringDeserializer()
describe("Running topology") {
it("generate events") {
val target = new SimpleTopology(
"dummy:9999",
"input",
"output",
)
val topology = target.createTopology()
val driver = new TopologyTestDriver(topology, target.properties)
val inputTopic = driver.createInputTopic(target.inputTopic, stringSer, stringSer)
val outputTopic = driver.createOutputTopic(target.outputTopic, stringDes, stringDes)
inputTopic.pipeInput("key1", "value1")
val results = outputTopic.readKeyValuesToMap().asScala
results should be(Map("key1" -> "VALUE1"))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment