package akka.kafka.scaladsl
import java.util.concurrent.TimeUnit
import java.util.{Properties, UUID}
import akka.kafka.{ConsumerSettings, ProducerSettings}
import akka.testkit.TestKit
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
class KafkaPlayground extends TestKit(ActorSystem("KafkaPlayground")) with FlatSpecLike with Matchers with BeforeAndAfterAll with ScalaFutures {
implicit val materializer = ActorMaterializer()
implicit val config = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2181)
val topic = UUID.randomUUID().toString
override def beforeAll(): Unit = try super.beforeAll() finally {
EmbeddedKafka.createCustomTopic(topic, new Properties())
override def afterAll(): Unit = try super.afterAll() finally {
it should "consume" in {
val msgCount = 100
val producerSettings =
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
def consumerSettings(topic: String, clientId: String) =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer, Set(topic))
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producer = producerSettings.createKafkaProducer()
for (i <- 1 to msgCount)
producer.send(new ProducerRecord(topic, 0, null: Array[Byte], "test"))
producer.close(60, TimeUnit.SECONDS)
val consumingStart = System.currentTimeMillis()
val sf =
Consumer.plainSource(consumerSettings(topic, UUID.randomUUID().toString))
implicit val patienceConfig = PatienceConfig(
timeout = scaled(Span(60, Seconds)),
interval = scaled(Span(150, Millis))
whenReady(sf) { result =>
println("took " + (System.currentTimeMillis() - consumingStart))
