Skip to content

Instantly share code, notes, and snippets.

@l15k4
Created June 6, 2016 11:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save l15k4/8f09b62f41eef019d07051798c495064 to your computer and use it in GitHub Desktop.
Save l15k4/8f09b62f41eef019d07051798c495064 to your computer and use it in GitHub Desktop.
package com.example
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings}
import akka.stream.scaladsl.{Sink, Source}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import org.scalatest.time.{Second, Seconds, Span}
import scala.concurrent.duration._
class KafkaPlayground extends BaseSuite with KafkaSupport with AkkaSupport {
implicit val futurePatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Second))
override def beforeAll(): Unit = try createCustomTopics("topic1", "topic2") finally super.beforeAll()
override def afterAll(): Unit = try deleteCustomTopics("topic1", "topic2") finally super.afterAll()
"publish events from topic to topic" in {
val start = System.currentTimeMillis()
val msgCount = 10000
val producerSettings =
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
def consumerSettings(topic: String, clientId: String) =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer, Set(topic))
.withBootstrapServers("localhost:9092")
.withPollInterval(0.millis)
.withClientId(clientId)
Source(1 to msgCount)
.map( msg => new ProducerRecord[Array[Byte], String]("topic1", msg.toString) )
.runWith(Producer.plainSink(producerSettings))
// publish events from topic1 to topic2
Consumer.plainSource(consumerSettings("topic1", "client1"))
.map( msg => new ProducerRecord[Array[Byte], String]("topic2", msg.value) )
.to(Producer.plainSink(producerSettings)).run()
val sf =
Consumer.plainSource(consumerSettings("topic2", "client2"))
.take(msgCount)
.runWith(Sink.seq)
whenReady(sf) { result =>
println(s"It took ${System.currentTimeMillis() - start} ms to pipe $msgCount messages..." )
}
}
}
@l15k4
Copy link
Author

l15k4 commented Jun 6, 2016

package com.example

import kafka.admin.AdminUtils
import kafka.utils.ZkUtils
import org.scalatest.{Suite, BeforeAndAfterAll}

trait KafkaSupport extends Suite with BeforeAndAfterAll {

  val zkUtils = ZkUtils(s"localhost:2181", 2000, 2000, false)

  override def afterAll(): Unit = try super.afterAll() finally {
    zkUtils.close()
  }

  def createCustomTopics(topics: String*): Unit = {
    topics.foreach(AdminUtils.createTopic(zkUtils, _, 1, 1))
    Thread.sleep(1000)
  }

  def deleteCustomTopics(topics: String*): Unit =
    topics.foreach { topic =>
      AdminUtils.deleteTopic(zkUtils, topic)
      AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment