Last active
September 22, 2020 00:01
-
-
Save kiambogo/8247a7bbf79f00414d1489b7e6fc90d0 to your computer and use it in GitHub Desktop.
FS2 1.0.0 groupBy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fs2.concurrent.Queue | |
import cats.implicits._ | |
import cats.effect.Concurrent | |
import cats.effect.concurrent.Ref | |
def groupBy[F[_], A, K](selector: A => F[K])(implicit F: Concurrent[F]): Pipe[F, A, (K, Stream[F, A])] = { | |
in => | |
Stream.eval(Ref.of[F, Map[K, Queue[F, Option[A]]]](Map.empty)).flatMap { st => | |
val cleanup = { | |
import alleycats.std.all._ | |
st.get.flatMap(_.traverse_(_.enqueue1(None))) | |
} | |
(in ++ Stream.eval_(cleanup)) | |
.evalMap { el => | |
(selector(el), st.get).mapN { (key, queues) => | |
queues.get(key).fold { | |
for { | |
newQ <- Queue.unbounded[F, Option[A]] // Create a new queue | |
_ <- st.modify(x => (x + (key -> newQ), x)) // Update the ref of queues | |
_ <- newQ.enqueue1(el.some) | |
} yield (key -> newQ.dequeue.unNoneTerminate).some | |
}(_.enqueue1(el.some) as None) | |
}.flatten | |
}.unNone.onFinalize(cleanup) | |
} | |
} |
Sorry, but it's looks like abstract nonsense, can't get my head around why it's not working.
Seems like due to deadlocks:
object Fs2TestApp extends App {
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
val toIO: FunctionK[Pure, IO] = new FunctionK[Pure, IO] {
def apply[A](l: Pure[A]): IO[A] = IO(l)
}
val str = Stream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11).translate(toIO)
val r = groupBy[IO, Int, Int](x => IO(x % 2)).apply(str)
.map {
case (k, s1) =>
println(s"$k, $s1") // only first key got here
s1
}
println(r.flatten.compile.toList.unsafeRunTimed(Duration.apply("15 seconds")))
}
The problem was in flatten
. Without it works, but I still cannot understand why. Explanations are pretty much appreciated. :)
Thank you for this code, kiambogo!
Okay, you need to join the internal streams with parJoin
or parJoinUnbounded
to make it work as a single stream again
In our use case, we have an unbounded number of keys and the streams never terminate. Here's a method we used in conjunction with groupBy
to handle the problem:
def clientMessages[F[_], A, K](
batchSize: Int,
batchTime: FiniteDuration,
maxPending: Long
)(
selector: A => F[K]
): Pipe[F, A, (K, Chunk[A])] = { in =>
val batches: Pipe[F, A, (K, Chunk[A])] = _.through(groupBy(selector)).map {
case (k, kas) =>
kas.groupWithin(batchSize, batchTime).map(c => k -> c)
}.parJoinUnbounded
Stream.eval(Semaphore[F](maxPending)).flatMap { pending =>
in.evalTap(_ => pending.acquire)
.through(batches)
.evalTap(kc => pending.releaseN(kc._2.size.toLong))
}
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Project alleycats seems deprecated now and I failed to find Traverse instance for Map, so simplest solution would be make it on List level:
tate.get.flatMap(_.toList.traverse(_._2.enqueue1(None))).map(_ => ())