Skip to content

Instantly share code, notes, and snippets.

@muller
Created November 15, 2016 10:06
Show Gist options
  • Save muller/0fd39738b940712e65295d8a6ed8775a to your computer and use it in GitHub Desktop.
Save muller/0fd39738b940712e65295d8a6ed8775a to your computer and use it in GitHub Desktop.
package consumer
import akka.actor._
import akka.kafka.ConsumerMessage._
import akka.kafka._
import akka.kafka.scaladsl._
import akka.stream._
import akka.stream.scaladsl._
import akka.testkit._
import net.manub.embeddedkafka._
import org.apache.kafka.common.serialization._
import org.scalatest._
import scala.concurrent.duration._
class ZipperSpec extends TestKit(ActorSystem()) with FlatSpecLike with Matchers with EmbeddedKafka with BeforeAndAfterAll {
implicit val materializer = ActorMaterializer()
val Orion =
"Betelgeuse Rigel Bellatrix Mintaka Alnilam Alnitak Saiph".split(' ')
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:6001")
.withGroupId("test")
.withProperty(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
override def beforeAll = EmbeddedKafka.start()
override def afterAll = EmbeddedKafka.stop()
it should "read and commit" in {
Orion.foreach { star =>
publishStringMessageToKafka("orion", star)
}
val fut = Consumer
.committableSource(consumerSettings, Subscriptions.topics("orion"))
.via(committable(flow))
.runWith(Sink.actorRef(testActor, None))
receiveN(7) should be(Seq("BETELGEUSE", "RIGEL", "BELLATRIX", "MINTAKA", "ALNILAM", "ALNITAK", "SAIPH"))
expectNoMsg()
}
def flow = Flow[String].map {
_.toUpperCase
}
def committable[In, Out](flow: Flow[In, Out, _]) = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val split = Flow[CommittableMessage[_, In]]
.map(it => it -> it.record.value)
val commit = Flow[(CommittableMessage[_, In], Out)]
.mapAsync(Int.MaxValue) {
case (message, out) =>
message.committableOffset.commitScaladsl().map(done => out)
}
val in = b.add(split)
val unzip = b.add(Unzip[CommittableMessage[_, In], In])
val zip = b.add(Zip[CommittableMessage[_, In], Out])
val done = b.add(commit)
// @formatter:off
in.out ~> unzip.in
unzip.out0 ~> zip.in0
unzip.out1 ~> flow ~> zip.in1
zip.out ~> done.in
// @formatter:on
FlowShape(in.in, done.out)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment