Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active April 2, 2023 10:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/ea357226917aad56ede30e3ef36271ba to your computer and use it in GitHub Desktop.
Save dacr/ea357226917aad56ede30e3ef36271ba to your computer and use it in GitHub Desktop.
Kafka features test using embedded kafka. / published by https://github.com/dacr/code-examples-manager #ce57d71f-05b2-461d-82b7-c634fc73d80c/387a3a2fceeb5f73245eb883de0e05fd2e09e346
// summary : Kafka features test using embedded kafka.
// keywords : scala, scalatest, kafka, embedded-kafka
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : ce57d71f-05b2-461d-82b7-c634fc73d80c
// created-on : 2018-10-10T08:45:12+02:00
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : cs launch --scala 2.13 com.lihaoyi:::ammonite:2.4.0 -M ammonite.Main -- $file
import $ivy.`org.scalatest::scalatest:3.2.9`
import $ivy.`io.github.embeddedkafka::embedded-kafka:2.8.0`
import $ivy.`org.apache.logging.log4j:log4j-api:2.14.1`
import $ivy.`org.apache.logging.log4j:log4j-core:2.14.1`
import $ivy.`org.apache.logging.log4j:log4j-slf4j-impl:2.14.1`
import org.scalatest._, flatspec._, matchers._, OptionValues._
import io.github.embeddedkafka._
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
org.apache.logging.log4j.core.config.Configurator.setRootLevel(org.apache.logging.log4j.Level.ERROR)
class KafkaFeaturesTest extends AnyFlatSpec with should.Matchers with EmbeddedKafka {
override val suiteName = "KafkaFeaturesTest"
// -------------------------------------------------------------------------------------------------------------------
"Kafka embedded" should "be able to dynamically choose an available port" in {
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
info(s"Kafka is listening on port ${actualConfig.kafkaPort} for testing purposes")
implicit val serializer = new StringSerializer
implicit val deserializer = new StringDeserializer
publishToKafka("news", "Hello world !")
consumeFirstMessageFrom("news") should include regex "(?i)hello"
}
}
// -------------------------------------------------------------------------------------------------------------------
it should "be able to create customized configuration" in {
val size = (20*1024*1024).toString
val brokerConfig = Map(
"min.insync.replicas" -> "1",
"replica.fetch.max.bytes" -> size,
"message.max.bytes" -> size
)
val producerConfig = Map(
"max.request.size" -> size
)
val consumerConfig = Map(
"max.partition.fetch.bytes" -> size
)
val userDefinedConfig = EmbeddedKafkaConfig(
kafkaPort = 0,
zooKeeperPort = 0,
customBrokerProperties = brokerConfig,
customProducerProperties = producerConfig,
customConsumerProperties = consumerConfig
)
withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
val topicName = "truc"
val topicConfig = Map(
"retention.ms" -> "31536000000" // One Year
)
createCustomTopic(
topic = topicName,
topicConfig = topicConfig,
partitions = 1,
replicationFactor = 1
)
implicit val serializer = new StringSerializer
implicit val deserializer = new StringDeserializer
val msg = ("12345678" * (20 * 1024 * 1024 / 16)) // a 20Mb messages
publishToKafka(topicName, msg)
consumeFirstMessageFrom(topicName) should include regex "(?i)12345678"
}
}
// -------------------------------------------------------------------------------------------------------------------
it should "be possible to directly use the java API" in {
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
implicit val serializer = new StringSerializer
implicit val deserializer = new StringDeserializer
withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
val port = actualConfig.kafkaPort
// TO BE CONTINUED
}
}
}
org.scalatest.tools.Runner.main(Array("-oDF", "-s", classOf[KafkaFeaturesTest].getName))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment