Created
June 29, 2016 10:08
-
-
Save l15k4/01cfd8dc81ca963bfd283d9eddd87068 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example | |
import akka.kafka.scaladsl.{Consumer, Producer} | |
import akka.kafka.{ConsumerSettings, ProducerSettings} | |
import akka.stream.scaladsl.{Sink, Source} | |
import kafka.utils.ZkUtils | |
import org.apache.kafka.clients.consumer.ConsumerConfig | |
import org.apache.kafka.clients.producer.ProducerRecord | |
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} | |
import org.scalatest.BeforeAndAfterEach | |
import org.scalatest.time.{Second, Seconds, Span} | |
import scala.concurrent.duration._ | |
class KafkaPlayground extends BaseSuite with KafkaSupport with AkkaSupport with BeforeAndAfterEach { | |
implicit val futurePatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Second)) | |
val KafkaHost = "localhost" | |
val zkUtils = ZkUtils(s"$KafkaHost:2181", 2000, 2000, false) | |
override def beforeEach(): Unit = | |
try { | |
createCustomTopics(1, Seq("topic1"), zkUtils) | |
Thread.sleep(1000) | |
} finally super.beforeEach() | |
override def afterEach(): Unit = | |
try { | |
deleteCustomTopics(Seq("topic1"), zkUtils) | |
} finally super.afterEach() | |
override def afterAll(): Unit = try zkUtils.close() finally super.afterAll() | |
"temp" in { | |
val start = System.currentTimeMillis() | |
val msgCount = 10 | |
val producerSettings = | |
ProducerSettings(system, new StringSerializer, new StringSerializer) | |
.withBootstrapServers(s"$KafkaHost:9092") | |
.withProperty("batch.size", "0") | |
def consumerSettings(topic: String, clientId: String) = | |
ConsumerSettings(system, new StringDeserializer, new StringDeserializer, Set("topic1")) | |
.withBootstrapServers(s"$KafkaHost:9092") | |
.withClientId(clientId) | |
.withPollInterval(15.millis) | |
// .withProperty("enable.auto.commit", "false") | |
// .withProperty("heartbeat.interval.ms", "300") | |
.withGroupId(clientId) | |
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") | |
val sf = | |
Consumer.committableSource(consumerSettings("topic1", "client1")) | |
.take(5) | |
.mapAsync (1)( result => result.committableOffset.commitScaladsl().map(_ => result)) | |
.runWith(Sink.seq) | |
.flatMap { result1 => | |
Thread.sleep(3000) | |
Consumer.committableSource(consumerSettings("topic1", "client1")) | |
.take(5) | |
.runWith(Sink.seq).map(result2 => result1 -> result2) | |
} | |
Thread.sleep(1000) | |
Source(1 to msgCount) | |
.map(msg => new ProducerRecord[String, String]("topic1", msg.toString)) | |
.runWith(Producer.plainSink(producerSettings)) | |
whenReady(sf) { case (recs, recs2) => | |
println( | |
s""" | |
|It took ${System.currentTimeMillis() - start} ms to pipe : | |
|recs1 : ${recs.map(_.value).mkString(" ")} | |
|recs2 : ${recs2.map(_.value).mkString(" ")} | |
|messages... | |
|""".stripMargin) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
If I use
Control
like this :There is an exception :
However if I don't call
shutdown
on the second Stream it doesn't happen ... Offset committing still doesn't work though :-/