Skip to content

Instantly share code, notes, and snippets.

@kiambogo
Last active September 22, 2020 00:01
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kiambogo/8247a7bbf79f00414d1489b7e6fc90d0 to your computer and use it in GitHub Desktop.
Save kiambogo/8247a7bbf79f00414d1489b7e6fc90d0 to your computer and use it in GitHub Desktop.
FS2 1.0.0 groupBy
import fs2.concurrent.Queue
import cats.implicits._
import cats.effect.Concurrent
import cats.effect.concurrent.Ref
def groupBy[F[_], A, K](selector: A => F[K])(implicit F: Concurrent[F]): Pipe[F, A, (K, Stream[F, A])] = {
in =>
Stream.eval(Ref.of[F, Map[K, Queue[F, Option[A]]]](Map.empty)).flatMap { st =>
val cleanup = {
import alleycats.std.all._
st.get.flatMap(_.traverse_(_.enqueue1(None)))
}
(in ++ Stream.eval_(cleanup))
.evalMap { el =>
(selector(el), st.get).mapN { (key, queues) =>
queues.get(key).fold {
for {
newQ <- Queue.unbounded[F, Option[A]] // Create a new queue
_ <- st.modify(x => (x + (key -> newQ), x)) // Update the ref of queues
_ <- newQ.enqueue1(el.some)
} yield (key -> newQ.dequeue.unNoneTerminate).some
}(_.enqueue1(el.some) as None)
}.flatten
}.unNone.onFinalize(cleanup)
}
}
@kell18
Copy link

kell18 commented Dec 21, 2018

Project alleycats seems deprecated now and I failed to find Traverse instance for Map, so simplest solution would be make it on List level:
tate.get.flatMap(_.toList.traverse(_._2.enqueue1(None))).map(_ => ())

@kell18
Copy link

kell18 commented Dec 24, 2018

Sorry, but it's looks like abstract nonsense, can't get my head around why it's not working.
Seems like due to deadlocks:

object Fs2TestApp extends App {
  implicit val ioContextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)

  val toIO: FunctionK[Pure, IO] = new FunctionK[Pure, IO] {
    def apply[A](l: Pure[A]): IO[A] = IO(l)
  }

  val str = Stream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11).translate(toIO)

  val r = groupBy[IO, Int, Int](x => IO(x % 2)).apply(str)
    .map {
      case (k, s1) =>
        println(s"$k, $s1") // only first key got here
        s1
    }


  println(r.flatten.compile.toList.unsafeRunTimed(Duration.apply("15 seconds")))
}

@kell18
Copy link

kell18 commented Dec 24, 2018

The problem was in flatten. Without it works, but I still cannot understand why. Explanations are pretty much appreciated. :)

Thank you for this code, kiambogo!

@kell18
Copy link

kell18 commented Dec 25, 2018

Okay, you need to join the internal streams with parJoin or parJoinUnbounded to make it work as a single stream again

@mjreiss
Copy link

mjreiss commented Feb 15, 2019

In our use case, we have an unbounded number of keys and the streams never terminate. Here's a method we used in conjunction with groupBy to handle the problem:

def clientMessages[F[_], A, K](
    batchSize: Int,
    batchTime: FiniteDuration,
    maxPending: Long
)(
    selector: A => F[K]
): Pipe[F, A, (K, Chunk[A])] = { in =>
  val batches: Pipe[F, A, (K, Chunk[A])] = _.through(groupBy(selector)).map {
    case (k, kas) =>
      kas.groupWithin(batchSize, batchTime).map(c => k -> c)
  }.parJoinUnbounded
  Stream.eval(Semaphore[F](maxPending)).flatMap { pending =>
    in.evalTap(_ => pending.acquire)
      .through(batches)
      .evalTap(kc => pending.releaseN(kc._2.size.toLong))
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment