Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
fs2 `groupBy/partitions`
// Grows with the number of distinct `K`
def partitions[F[_], A, K](selector: A => F[K])(implicit F: Effect[F], ec: ExecutionContext) : Pipe[F, A, (K, Stream[F, A])] = in =>
Stream.eval(async.refOf[F, Map[K, Queue[F, Option[A]]]](Map.empty)).flatMap { st =>
val cleanup = {
import alleycats.std.all._
st.get.flatMap(_.traverse_(_.enqueue1(None)))
}
(in ++ Stream.eval_(cleanup)).evalMap { el =>
(selector(el), st.get).mapN { (key, queues) =>
queues.get(key).fold(
for {
newQ <- Queue.unbounded[F, Option[A]]
_ <- st.modify(_ + (key -> newQ))
_ <- newQ.enqueue1(el.some)
} yield (key -> newQ.dequeue.unNoneTerminate).some
)(_.enqueue1(el.some) as None)
}.flatten
}.unNone.onFinalize(cleanup)
}
import ExecutionContext.Implicits.global
def selector(i: Int): IO[Int] =
IO.pure(i % 3)
def flakiness[A]: Pipe[IO, A, A] = in => {
def wait = IO(scala.util.Random.nextInt(500)).flatMap(d => Timer[IO].sleep(d.millis))
Stream.repeatEval(wait).zip(in).map(_._2)
}
def example =
Stream.range(1, 100).covary[IO].through(partitions(selector)).map {
case (k, st) => st.tupleLeft(k).through(flakiness).through(Sink.showLinesStdOut)
}.joinUnbounded
.compile.drain.unsafeRunSync
@kiambogo
Copy link

kiambogo commented Oct 25, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment