Skip to content

Instantly share code, notes, and snippets.

@l15k4
Created June 24, 2016 13:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save l15k4/442a7f6a493a828b528d838d5fb28c64 to your computer and use it in GitHub Desktop.
Save l15k4/442a7f6a493a828b528d838d5fb28c64 to your computer and use it in GitHub Desktop.
package com.example
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings}
import akka.stream.scaladsl.{Sink, Source}
import kafka.utils.ZkUtils
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.time.{Second, Seconds, Span}
import scala.concurrent.duration._
class KafkaPlayground extends BaseSuite with KafkaSupport with AkkaSupport with BeforeAndAfterEach {
implicit val futurePatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Second))
val KafkaHost = "localhost"
val zkUtils = ZkUtils(s"$KafkaHost:2181", 2000, 2000, false)
override def beforeEach(): Unit =
try {
createCustomTopics(1, Seq("topic1"), zkUtils)
Thread.sleep(1500)
} finally super.beforeEach()
override def afterEach(): Unit =
try {
deleteCustomTopics(Seq("topic1"), zkUtils)
Thread.sleep(1500)
} finally super.afterEach()
override def afterAll(): Unit = try zkUtils.close() finally super.afterAll()
"temp" in {
val start = System.currentTimeMillis()
val msgCount = 10
val producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(s"$KafkaHost:9092")
def consumerSettings(topic: String, clientId: String) =
ConsumerSettings(system, new StringDeserializer, new StringDeserializer, Set("topic1"))
.withBootstrapServers(s"$KafkaHost:9092")
.withClientId(clientId)
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Source(1 to msgCount)
.map(msg => new ProducerRecord[String, String]("topic1", msg.toString))
.runWith(Producer.plainSink(producerSettings))
val sf =
Consumer.committableSource(consumerSettings("topic1", "client1"))
.take(5)
.runWith(Sink.seq)
.flatMap { result1 =>
Thread.sleep(1000)
Consumer.committableSource(consumerSettings("topic1", "client1"))
.take(5)
.runWith(Sink.seq).map(result2 => result1 -> result2)
}
whenReady(sf) { case (recs, recs2) =>
println(
s"""
|It took ${System.currentTimeMillis() - start} ms to pipe :
|recs ${recs.map(_.value).mkString(",")}
|recs2 ${recs2.map(_.value).mkString(",")}
|messages...
|""".stripMargin)
}
}
}
--------------------------------------------------------
It took 2182 ms to pipe :
recs 1,2,3,4,5
recs2 1,2,3,4,5
messages...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment