Skip to content

Instantly share code, notes, and snippets.

@Rafailong
Created February 28, 2019 15:50
Show Gist options
  • Save Rafailong/43e010e8fd8d0ef277d76687befd4136 to your computer and use it in GitHub Desktop.
Save Rafailong/43e010e8fd8d0ef277d76687befd4136 to your computer and use it in GitHub Desktop.
Integration Tests in sbt projects.

This is a direct way to implement integrations tests in a sbt project.

Requirements

  1. Probando.scala and DockerKafkaService.scala must be in project-root/src/it/scala folder. In this example I located them in test-project/src/it/scala/me.rafaavila
  2. As you can see in build.sbt there is only one project which is based on sbt IntegrationTest default setting.

Inspiration

I did read this ovo energy blog post then I decided to go for it but with a dependecy the project I'm workin on right now has (Kafka).

name := "it-test-docker"
version := "0.1"
scalaVersion := "2.12.8"
scalacOptions += "-Ypartial-unification"
libraryDependencies ++= Seq(
"com.whisk" %% "docker-testkit-scalatest" % "0.9.8",
"com.whisk" %% "docker-testkit-impl-spotify" % "0.9.8")
libraryDependencies += "com.ovoenergy" %% "fs2-kafka" % "0.19.3"
Defaults.itSettings
lazy val root = project.in(file(".")).configs(IntegrationTest)
package me.rafaavila
import com.whisk.docker._
trait DockerKafkaService extends DockerKit {
def KafkaAdvertisedPort = 9092
val ZookeeperDefaultPort = 2181
lazy val kafkaContainer = DockerContainer("spotify/kafka")
.withPorts(KafkaAdvertisedPort -> Some(KafkaAdvertisedPort), ZookeeperDefaultPort -> None)
.withEnv(s"ADVERTISED_PORT=$KafkaAdvertisedPort", s"ADVERTISED_HOST=${dockerExecutor.host}")
.withReadyChecker(DockerReadyChecker.LogLineContains("kafka entered RUNNING state"))
abstract override def dockerContainers: List[DockerContainer] =
kafkaContainer :: super.dockerContainers
}
Probando
package me.rafaavila
import cats.effect._
import cats.implicits._
import com.whisk.docker.impl.spotify.DockerKitSpotify
import com.whisk.docker.scalatest.DockerTestKit
import org.scalatest._
import org.scalatest.time._
import fs2.kafka._
import fs2._
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
class Probando extends FlatSpec with Matchers with DockerKafkaService with DockerTestKit with DockerKitSpotify {
implicit val pc = PatienceConfig(Span(20, Seconds), Span(1, Second))
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
def processRecord(record: ConsumerRecord[String, String]): IO[(String, String)] =
IO.pure(record.key -> record.value)
"kafka container" should "be ready" in {
isContainerReady(kafkaContainer).futureValue shouldBe true
}
"kafka container" should "working correctly" in {
val consumerSettings =
ConsumerSettings[String, String]
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost:9092")
.withGroupId("group")
val producerSettings =
ProducerSettings[String, String]
.withBootstrapServers("localhost:9092")
val toProduce = (0 until 100).map(n => s"key-$n" -> s"value->$n")
val produced =
(for {
producer <- producerStream[IO].using(producerSettings)
_ <- Stream.eval(IO(producer.toString should startWith("KafkaProducer$")))
message <- Stream.chunk(Chunk.seq(toProduce).map {
case passthrough @ (key, value) =>
ProducerMessage.one(ProducerRecord("topic", key, value), passthrough)
})
batched <- Stream.eval(producer.producePassthrough(message)).buffer(toProduce.size)
passthrough <- Stream.eval(batched)
} yield passthrough).compile.toVector.unsafeRunSync()
produced should contain theSameElementsAs toProduce
info(s"produced: ${produced.mkString(",")}")
Thread.sleep(10000)
val consumed =
consumerStream[IO]
.using(consumerSettings)
.evalTap(_.subscribeTo("topic"))
.evalTap(consumer => IO(consumer.toString should startWith("KafkaConsumer$")).void)
.flatMap(_.stream)
.take(produced.size.toLong)
.map(message => message.record.key -> message.record.value)
.compile
.toVector
.unsafeRunSync
consumed should contain theSameElementsAs produced
info(s"consumed: ${consumed.mkString(",")}")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment