Skip to content

Instantly share code, notes, and snippets.

@notxcain
Created May 28, 2019 13:22
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save notxcain/5bf4d0757b650e0a699c4545114f2f0e to your computer and use it in GitHub Desktop.
Alpakka Kafka Fs2 interop
package aecor.kafkadistributedprocessing
import java.time.Duration
import java.util
import java.util.concurrent.Executors
import aecor.util.effect._
import akka.NotUsed
import akka.kafka.ConsumerMessage.PartitionOffset
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ AutoSubscription, ConsumerSettings }
import akka.stream.Materializer
import akka.stream.scaladsl.{ Keep, Source, Sink => AkkaSink }
import cats.effect.concurrent.{ Deferred, Ref }
import cats.effect.{ ConcurrentEffect, ContextShift, ExitCase, IO, Timer }
import cats.implicits._
import cats.effect.implicits._
import cats.~>
import fs2.Stream
import fs2.concurrent.Queue
import fs2.interop.reactivestreams._
import org.apache.kafka.clients.consumer.{
ConsumerRebalanceListener,
ConsumerRecord,
Consumer => KafkaConsumer
}
import org.apache.kafka.common
import scala.concurrent.duration._
import scala.collection.JavaConverters._
import scala.concurrent.{ ExecutionContext, Future }
object interop {
def sourceToStream[F[_], A, Mat](source: Source[A, Mat], materializer: Materializer)(
implicit F: ConcurrentEffect[F]
): Stream[F, (Mat, Stream[F, A])] =
Stream
.eval(F.delay {
source
.toMat(AkkaSink.asPublisher(false))(Keep.both)
.run()(materializer)
})
.map {
case (mat, publisher) =>
(mat, publisher.toStream[F]())
}
final case class TopicPartition[F[_], A](topic: String, partition: Int, messages: Stream[F, A])
final case class CommittableMessage[F[_], K, V](record: ConsumerRecord[K, V],
committableOffset: CommittableOffset[F])
final case class CommittableOffset[F[_]](commit: F[Unit], partitionOffset: PartitionOffset)
final case class AssignedPartition[F[_]](partition: Int,
partitionCount: Int,
watchRevocation: F[F[Unit]])
sealed abstract class RebalanceCommand[F[_]]
object RebalanceCommand {
final case class RevokePartitions[F[_]](partitions: Set[Int], commit: F[Unit])
extends RebalanceCommand[F]
final case class AssignPartitions[F[_]](partitions: Set[Int], commit: F[Unit])
extends RebalanceCommand[F]
}
final class EnqueueingRebalanceListener[F[_]](enqueue: RebalanceCommand[F] => F[Unit])(
implicit F: ConcurrentEffect[F]
) extends ConsumerRebalanceListener {
private def enqueueWithCompletionToken[A](f: F[Unit] => RebalanceCommand[F]): F[Unit] =
Deferred[F, Unit].flatMap(completion => enqueue(f(completion.complete(()))) >> completion.get)
override def onPartitionsRevoked(partitions: util.Collection[common.TopicPartition]): Unit =
F.toIO(
enqueueWithCompletionToken(
RebalanceCommand.RevokePartitions(partitions.asScala.map(_.partition()).toSet, _)
)
)
.unsafeRunSync()
override def onPartitionsAssigned(partitions: util.Collection[common.TopicPartition]): Unit =
F.toIO(
enqueueWithCompletionToken(
RebalanceCommand.AssignPartitions(partitions.asScala.map(_.partition()).toSet, _)
)
)
.unsafeRunSync()
}
object EnqueueingRebalanceListener {
final class UsePartiallyApplied[F[_]] {
def use[A](
subscribe: ConsumerRebalanceListener => F[A]
)(implicit F: ConcurrentEffect[F]): F[(A, Stream[F, RebalanceCommand[F]])] =
for {
queue <- Queue.unbounded[F, RebalanceCommand[F]]
listener = new EnqueueingRebalanceListener[F](queue.enqueue1)
a <- subscribe(listener)
} yield (a, queue.dequeue)
}
def apply[F[_]]: UsePartiallyApplied[F] = new UsePartiallyApplied[F]
}
def assignPartitions[F[_], K, V](consumerSettings: ConsumerSettings[K, V], topic: String)(
implicit F: ConcurrentEffect[F],
timer: Timer[F],
contextShift: ContextShift[F]
): Stream[F, AssignedPartition[F]] =
Stream
.bracket(F.delay {
val consumer = consumerSettings.createKafkaConsumer()
val executor = Executors.newSingleThreadExecutor()
new ((KafkaConsumer[K, V] => ?) ~> F) {
override def apply[A](f: KafkaConsumer[K, V] => A): F[A] =
contextShift.evalOn(ExecutionContext.fromExecutor(executor)) {
F.async[A] { cb =>
executor.execute(new Runnable {
override def run(): Unit =
cb {
try Right(f(consumer))
catch {
case e: Throwable => Left(e)
}
}
})
}
}
}
})(_(_.close()))
.flatMap { accessConsumer =>
val fetchPartitionCount = accessConsumer(_.partitionsFor(topic).size())
def subscribe(listener: ConsumerRebalanceListener) =
accessConsumer(_.subscribe(List(topic).asJava, listener))
val poll = accessConsumer(_.poll(Duration.ofMillis(50))).void
Stream
.eval(
EnqueueingRebalanceListener[F]
.use(
listener =>
subscribe(listener) >>
fetchPartitionCount
)
)
.flatMap {
case (pc, partitions) =>
partitions
.concurrently(Stream.repeatEval(poll >> timer.sleep(500.millis)))
.evalScan((List.empty[AssignedPartition[F]], Map.empty[Int, F[Unit] => F[Unit]])) {
case ((_, revocationCallbacks), command) =>
command match {
case RebalanceCommand.RevokePartitions(partitions, commit) =>
partitions.toList
.traverse_ { partition =>
revocationCallbacks.get(partition).traverse { callback =>
Deferred[F, Unit].flatMap { revocationCompletion =>
callback(revocationCompletion.complete(())) >> revocationCompletion.get
}
}
} >> commit.as((List.empty, revocationCallbacks -- partitions))
case RebalanceCommand.AssignPartitions(partitions, commit) =>
partitions.toList
.traverse { p =>
Deferred[F, F[Unit]].flatMap { x =>
Ref[F].of(false).map { cancelled =>
(
AssignedPartition(p, pc, x.get.guaranteeCase {
case ExitCase.Canceled => cancelled.set(true)
case _ => ().pure[F]
}),
(complete: F[Unit]) =>
cancelled.get.ifM(complete, x.complete(complete))
)
}
}
}
.flatMap { list =>
val assignedPartitions = list.map(_._1)
val updatedRevocationCallbacks = revocationCallbacks ++ list.map(
x => (x._1.partition, x._2)
)
commit.as((assignedPartitions, updatedRevocationCallbacks))
}
}
}
}
.flatMap {
case (assignedPartitions, _) =>
Stream.emits(assignedPartitions)
}
}
def committablePartitionedStream[F[_], K, V](
consumerSettings: ConsumerSettings[K, V],
subscription: AutoSubscription,
materializer: Materializer
)(implicit F: ConcurrentEffect[F]): Stream[F, TopicPartition[F, CommittableMessage[F, K, V]]] =
Consumer
.committablePartitionedSource(consumerSettings, subscription)
.toStream[F](materializer)
.flatMap {
case (control, stream) =>
val shutdown =
F.liftIO(
IO.fromFuture(
IO(control.drainAndShutdown(Future.successful(()))(materializer.executionContext))
)
)
.void
stream.onFinalize(shutdown).map {
case (tp, source) =>
val messages = source.toStream[F](materializer).map { cm =>
CommittableMessage(
cm.record,
CommittableOffset(
F.fromFuture(cm.committableOffset.commitScaladsl()).void,
cm.committableOffset.partitionOffset
)
)
}
TopicPartition(tp.topic(), tp.partition(), messages)
}
}
def plainPartitionedStream[F[_], K, V](
consumerSettings: ConsumerSettings[K, V],
subscription: AutoSubscription,
materializer: Materializer
)(implicit F: ConcurrentEffect[F]): Stream[F, TopicPartition[F, ConsumerRecord[K, V]]] =
Consumer
.plainPartitionedSource(consumerSettings, subscription)
.toStream[F](materializer)
.flatMap {
case (control, stream) =>
val shutdown =
F.liftIO(
IO.fromFuture(
IO(control.drainAndShutdown(Future.successful(()))(materializer.executionContext))
)
)
.void
stream.onFinalize(shutdown).map {
case (tp, source) =>
val messages = source.toStream[F](materializer)
TopicPartition(tp.topic(), tp.partition(), messages)
}
}
implicit final class ConsumerSettingsOps[K, V](val self: ConsumerSettings[K, V]) extends AnyVal {
def committablePartitionedStream[F[_]: ConcurrentEffect](
subscription: AutoSubscription,
materializer: Materializer
): Stream[F, TopicPartition[F, CommittableMessage[F, K, V]]] =
interop.committablePartitionedStream(self, subscription, materializer)
def plainPartitionedStream[F[_]: ConcurrentEffect](
subscription: AutoSubscription,
materializer: Materializer
): Stream[F, TopicPartition[F, ConsumerRecord[K, V]]] =
interop.plainPartitionedStream(self, subscription, materializer)
}
implicit final class SourceToStreamOps[A, Mat](val source: Source[A, Mat]) extends AnyVal {
def toStream[F[_]](
materializer: Materializer
)(implicit F: ConcurrentEffect[F]): Stream[F, (Mat, Stream[F, A])] =
sourceToStream(source, materializer)
}
implicit final class SourceWithNotUsedMatToStreamOps[A](val source: Source[A, NotUsed])
extends AnyVal {
def toStream[F[_]](materializer: Materializer)(implicit F: ConcurrentEffect[F]): Stream[F, A] =
sourceToStream(source, materializer).flatMap(_._2)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment