Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@l15k4
Created June 29, 2016 10:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save l15k4/01cfd8dc81ca963bfd283d9eddd87068 to your computer and use it in GitHub Desktop.
Save l15k4/01cfd8dc81ca963bfd283d9eddd87068 to your computer and use it in GitHub Desktop.
package com.example
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings}
import akka.stream.scaladsl.{Sink, Source}
import kafka.utils.ZkUtils
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.time.{Second, Seconds, Span}
import scala.concurrent.duration._
class KafkaPlayground extends BaseSuite with KafkaSupport with AkkaSupport with BeforeAndAfterEach {
implicit val futurePatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Second))
val KafkaHost = "localhost"
val zkUtils = ZkUtils(s"$KafkaHost:2181", 2000, 2000, false)
override def beforeEach(): Unit =
try {
createCustomTopics(1, Seq("topic1"), zkUtils)
Thread.sleep(1000)
} finally super.beforeEach()
override def afterEach(): Unit =
try {
deleteCustomTopics(Seq("topic1"), zkUtils)
} finally super.afterEach()
override def afterAll(): Unit = try zkUtils.close() finally super.afterAll()
"temp" in {
val start = System.currentTimeMillis()
val msgCount = 10
val producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(s"$KafkaHost:9092")
.withProperty("batch.size", "0")
def consumerSettings(topic: String, clientId: String) =
ConsumerSettings(system, new StringDeserializer, new StringDeserializer, Set("topic1"))
.withBootstrapServers(s"$KafkaHost:9092")
.withClientId(clientId)
.withPollInterval(15.millis)
// .withProperty("enable.auto.commit", "false")
// .withProperty("heartbeat.interval.ms", "300")
.withGroupId(clientId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val sf =
Consumer.committableSource(consumerSettings("topic1", "client1"))
.take(5)
.mapAsync (1)( result => result.committableOffset.commitScaladsl().map(_ => result))
.runWith(Sink.seq)
.flatMap { result1 =>
Thread.sleep(3000)
Consumer.committableSource(consumerSettings("topic1", "client1"))
.take(5)
.runWith(Sink.seq).map(result2 => result1 -> result2)
}
Thread.sleep(1000)
Source(1 to msgCount)
.map(msg => new ProducerRecord[String, String]("topic1", msg.toString))
.runWith(Producer.plainSink(producerSettings))
whenReady(sf) { case (recs, recs2) =>
println(
s"""
|It took ${System.currentTimeMillis() - start} ms to pipe :
|recs1 : ${recs.map(_.value).mkString(" ")}
|recs2 : ${recs2.map(_.value).mkString(" ")}
|messages...
|""".stripMargin)
}
}
}
@l15k4
Copy link
Author

l15k4 commented Jun 29, 2016

If I use Control like this :

    val sf: Future[Done] = {
     val control =
      Consumer.committableSource(consumerSettings("topic1", "client1"))
        .take(5)
        .mapAsync(1)( result => result.committableOffset.commitScaladsl().map(_ => result))
        .grouped(5)
        .to(Sink.last).run()

        control.stop.flatMap(_ => control.shutdown())
        .flatMap { result1 =>
          Thread.sleep(1000)

          val control2 =
            Consumer.committableSource(consumerSettings("topic1", "client1"))
              .take(5)
              .mapAsync(1)( result => result.committableOffset.commitScaladsl().map(_ => result))
              .grouped(5)
              .to(Sink.last).run()
          control2.stop().flatMap(_ => control2.shutdown())
        }
    }

There is an exception :

[info]   The future returned an exception of type: java.lang.IllegalStateException, with message: not yet initialized: only setHandler is allowed in GraphStageLogic constructor. (KafkaPlayground.scala:84)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   ...
[info]   at gwi.gwiq.pm.KafkaPlayground$$anonfun$1.apply$mcV$sp(KafkaPlayground.scala:84)
[info]   at gwi.gwiq.pm.KafkaPlayground$$anonfun$1.apply(KafkaPlayground.scala:37)
[info]   at gwi.gwiq.pm.KafkaPlayground$$anonfun$1.apply(KafkaPlayground.scala:37)
[info]   at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.lang.IllegalStateException: not yet initialized: only setHandler is allowed in GraphStageLogic constructor
[info]   at akka.stream.stage.GraphStageLogic.interpreter(GraphStage.scala:239)
[info]   at akka.stream.stage.GraphStageLogic.materializer(GraphStage.scala:245)
[info]   at akka.kafka.internal.StageLogging$class.log(StageLogging.scala:22)
[info]   at akka.kafka.internal.ConsumerStageLogic.log(ConsumerStage.scala:333)
[info]   at akka.kafka.internal.ConsumerStageLogic.stop(ConsumerStage.scala:464)
[info]   at gwi.gwiq.pm.KafkaPlayground$$anonfun$1$$anonfun$4.apply(KafkaPlayground.scala:74)
[info]   at gwi.gwiq.pm.KafkaPlayground$$anonfun$1$$anonfun$4.apply(KafkaPlayground.scala:65)
[info]   at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:253)
[info]   at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
[info]   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
[info]   ...

However if I don't call shutdown on the second Stream it doesn't happen ... Offset committing still doesn't work though :-/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment