Skip to content

Instantly share code, notes, and snippets.

@kciesielski
Last active July 7, 2016 17:06
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kciesielski/8a07f1562d25e497bdb5b22d1c71b55c to your computer and use it in GitHub Desktop.
Save kciesielski/8a07f1562d25e497bdb5b22d1c71b55c to your computer and use it in GitHub Desktop.
package akka.kafka.scaladsl
import java.util.concurrent.TimeUnit
import java.util.{Properties, UUID}
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, ProducerSettings}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.testkit.TestKit
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
class KafkaPlayground extends TestKit(ActorSystem("KafkaPlayground")) with FlatSpecLike with Matchers with BeforeAndAfterAll with ScalaFutures {
implicit val materializer = ActorMaterializer()
implicit val config = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2181)
val topic = UUID.randomUUID().toString
override def beforeAll(): Unit = try super.beforeAll() finally {
EmbeddedKafka.start()
EmbeddedKafka.createCustomTopic(topic, new Properties())
}
override def afterAll(): Unit = try super.afterAll() finally {
EmbeddedKafka.stop()
}
it should "consume" in {
val msgCount = 100
val producerSettings =
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
def consumerSettings(topic: String, clientId: String) =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer, Set(topic))
.withBootstrapServers("localhost:9092")
.withClientId(clientId)
.withGroupId(UUID.randomUUID().toString)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producer = producerSettings.createKafkaProducer()
println("Writing")
for (i <- 1 to msgCount)
producer.send(new ProducerRecord(topic, 0, null: Array[Byte], "test"))
producer.close(60, TimeUnit.SECONDS)
println("Reading")
val consumingStart = System.currentTimeMillis()
val sf =
Consumer.plainSource(consumerSettings(topic, UUID.randomUUID().toString))
.take(msgCount.toLong)
.runWith(Sink.seq)
implicit val patienceConfig = PatienceConfig(
timeout = scaled(Span(60, Seconds)),
interval = scaled(Span(150, Millis))
)
whenReady(sf) { result =>
println("took " + (System.currentTimeMillis() - consumingStart))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment