Skip to content

Instantly share code, notes, and snippets.

@gvolpe
Created June 8, 2018 06:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gvolpe/2fca17d92f3ac0050472497d3ba3af90 to your computer and use it in GitHub Desktop.
Save gvolpe/2fca17d92f3ac0050472497d3ba3af90 to your computer and use it in GitHub Desktop.
import cats.effect.{ExitCode, IO, IOApp}
import cats.instances.list._
import cats.syntax.all._
import fs2._
import scala.concurrent.duration._
object jobs extends IOApp {
val largeStream: Stream[IO, Int] = Stream.range(0, 100).covary[IO]
def listParallelProcessor(list: List[Int]): Stream[IO, Unit] =
Stream.eval(list.parTraverse(x => IO(println(s"Processing: $x"))).void)
val p1: Stream[IO, Unit] =
largeStream
.segmentN(10)
.flatMap(s => listParallelProcessor(s.force.toList) ++ Stream.sleep_[IO](2.seconds))
val sinkProcessor: Sink[IO, Int] = _.evalMap(x => IO(println(s"Processing: $x")))
val p2: Stream[IO, Unit] =
largeStream
.segmentN(10)
.map(s => Stream.segment(s).covary[IO].to(sinkProcessor) ++ Stream.sleep_[IO](2.seconds))
.join(2)
override def run(args: List[String]): IO[ExitCode] =
p2.compile.drain.as(ExitCode.Success)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment