Skip to content

Instantly share code, notes, and snippets.

@tbekas
Last active November 9, 2021 20:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tbekas/f18f245d2702542a439ffd83d462f55e to your computer and use it in GitHub Desktop.
Save tbekas/f18f245d2702542a439ffd83d462f55e to your computer and use it in GitHub Desktop.
Parallel stream with discriminator, question
package org.example
import cats.effect.std.Random
import cats.effect.{ExitCode, IO, IOApp, Temporal}
import cats.syntax.all._
import cats.{Applicative, Monad}
import fs2._
import scala.concurrent.duration._
object GitterQuestion extends IOApp {
override def run(args: List[String]): IO[ExitCode] =
Random.scalaUtilRandom[IO].flatMap { implicit random =>
val flat = Stream(
("a", 1),
("a", 2),
("a", 3),
("b", 1),
("b", 2),
("b", 3),
("c", 1),
("c", 2),
("c", 3)
).covary[IO]
val a = flat.filter { case (k, _) => k === "a" }.through(rndDelay)
val b = flat.filter { case (k, _) => k === "b" }.through(rndDelay)
val c = flat.filter { case (k, _) => k === "c" }.through(rndDelay)
val nested = Stream(a, b, c)
nested.parJoin(100).printlns.compile.drain.as(ExitCode.Success)
}
def rndDelay[F[_]: Monad: Random: Temporal, A]: Pipe[F, A, A] =
in =>
in.evalMap { v =>
(Random[F].nextDouble.map(_.seconds) >>= Temporal[F].sleep) >> Applicative[F].pure(v)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment