Skip to content

Instantly share code, notes, and snippets.

@notxcain
Created March 20, 2019 10:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save notxcain/2cfa7d823d3ea049cc85f63ed0abecff to your computer and use it in GitHub Desktop.
Save notxcain/2cfa7d823d3ea049cc85f63ed0abecff to your computer and use it in GitHub Desktop.
fs2 with Alpakka Kafka
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink => AkkaSink}
import cats.effect.{ConcurrentEffect, ExitCode, IO, Resource}
import cats.implicits._
import fs2.interop.reactivestreams._
import org.apache.kafka.common.serialization.StringDeserializer
object AkkaTest {
def run[F[_]](implicit F: ConcurrentEffect[F]): F[ExitCode] = {
val app = for {
system <- Resource.make(F.delay(ActorSystem()))(
system => F.liftIO(IO.fromFuture(IO(system.terminate())).void)
)
mat <- Resource.liftF(F.delay(ActorMaterializer()(system)))
consumerSettings <- Resource.liftF(
F.delay(
ConsumerSettings(
system,
new StringDeserializer,
new StringDeserializer
).withBootstrapServers("localhost:9092")
.withGroupId("test")
)
)
publisher <- Resource
.make(F.delay {
Consumer
.committableSource(consumerSettings, Subscriptions.topics("test-topic"))
.toMat(AkkaSink.asPublisher(false))(Keep.both)
.run()(mat)
}) {
case (control, _) => F.liftIO(IO.fromFuture(IO(control.shutdown))).void
}
.map(_._2)
fs2Stream = publisher.toStream[F]
messages <- Resource.liftF(fs2Stream.take(50).compile.toVector)
_ <- Resource.liftF(F.delay(println(messages.mkString("\n"))))
} yield 0
app.use(_ => ExitCode.Success.pure[F])
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment