Skip to content

Instantly share code, notes, and snippets.

@l15k4
Created June 14, 2016 12:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save l15k4/29c816f3a773c38dc8d594457de8fa26 to your computer and use it in GitHub Desktop.
Save l15k4/29c816f3a773c38dc8d594457de8fa26 to your computer and use it in GitHub Desktop.
package com.example
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings}
import akka.stream.scaladsl.{Sink, Source}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.scalatest.time.{Second, Seconds, Span}
class KafkaPlayground extends BaseSuite with KafkaSupport with AkkaSupport {
implicit val futurePatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Second))
val KafkaHost = "localhost"
override def beforeAll(): Unit = try createCustomTopics(Seq("topic1"), s"$KafkaHost:2181") finally super.beforeAll()
override def afterAll(): Unit = try deleteCustomTopics("topic1") finally super.afterAll()
"test" in {
val start = System.currentTimeMillis()
val msgCount = 20
def producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(s"$KafkaHost:9092")
def consumerSettings(topic: String, clientId: String) =
ConsumerSettings(system, new StringDeserializer, new StringDeserializer, Set(topic))
.withBootstrapServers(s"$KafkaHost:9092")
.withClientId(clientId)
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
Source(1 to msgCount)
.map( msg => new ProducerRecord[String, String]("topic1", msg.toString) )
.runWith(Producer.plainSink(producerSettings))
val sf =
Consumer.plainSource(consumerSettings("topic1", "client1"))
.take(10)
.runWith(Sink.seq)
whenReady(sf) { case result =>
println(
s"""
|It took ${System.currentTimeMillis() - start} ms to pipe :
|${result.mkString(",")}
|messages...
|"""".stripMargin )
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment