Created July 9, 2018 10:32
fs2 `groupBy/partitions`
// Grows with the number of distinct `K`
def partitions[F[_], A, K](selector: A => F[K])(implicit F: Effect[F], ec: ExecutionContext) : Pipe[F, A, (K, Stream[F, A])] = in =>
Stream.eval(async.refOf[F, Map[K, Queue[F, Option[A]]]](Map.empty)).flatMap { st =>
val cleanup = {
import alleycats.std.all._
(in ++ Stream.eval_(cleanup)).evalMap { el =>
(selector(el), st.get).mapN { (key, queues) =>
for {
newQ <- Queue.unbounded[F, Option[A]]
_ <- st.modify(_ + (key -> newQ))
_ <- newQ.enqueue1(el.some)
} yield (key -> newQ.dequeue.unNoneTerminate).some
)(_.enqueue1(el.some) as None)
def selector(i: Int): IO[Int] =
IO.pure(i % 3)
def flakiness[A]: Pipe[IO, A, A] = in => {
def wait = IO(scala.util.Random.nextInt(500)).flatMap(d => Timer[IO].sleep(d.millis))
def example =
Stream.range(1, 100).covary[IO].through(partitions(selector)).map {
case (k, st) => st.tupleLeft(k).through(flakiness).through(Sink.showLinesStdOut)
