Skip to content

Instantly share code, notes, and snippets.

@lewapek
Last active September 21, 2018 19:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lewapek/934ddb5384f05b008b50246632276d4c to your computer and use it in GitHub Desktop.
Save lewapek/934ddb5384f05b008b50246632276d4c to your computer and use it in GitHub Desktop.
Scalatest AsyncFunSuite vs FunSuite
// async suite
test("listen for one message when subscribed to topics regex") {
val producer = KafkaProducer[String,String](producerCfg, io)
val consumer = KafkaConsumerObservable[String,String](consumerCfg, topicsRegex).executeOn(io)
// Publishing one message
val flow = for {
_ <- producer.send(topicMatchingRegex, "test-message")
first <- consumer.take(1).map(_.value()).firstL
} yield {
first
}
flow.runAsync.map { result =>
assert(result === "test-message2")
}
}
//sync
test("listen for one message when subscribed to topics regex") {
val producer = KafkaProducer[String,String](producerCfg, io)
val consumer = KafkaConsumerObservable[String,String](consumerCfg, topicsRegex).executeOn(io)
try {
// Publishing one message
val send = producer.send(topicMatchingRegex, "test-message")
Await.result(send.runAsync, 30.seconds)
val first = consumer.take(1).map(_.value()).firstL
val result = Await.result(first.runAsync, 30.seconds)
assert(result === "test-message")
}
finally {
Await.result(producer.close().runAsync, Duration.Inf)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment