Skip to content

Instantly share code, notes, and snippets.

@avakhrenev
Last active January 22, 2019 14:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save avakhrenev/3325d924f3b47a6f887d95022cce06a2 to your computer and use it in GitHub Desktop.
Save avakhrenev/3325d924f3b47a6f887d95022cce06a2 to your computer and use it in GitHub Desktop.
fs2 runConcat. Evaluate two streams asynchronously, concating result.
import cats.effect.{Effect, IO}
import fs2._
import scala.concurrent.ExecutionContext
def runConcat[F[_], A](first: Stream[F, A], second: Stream[F, A])(
implicit F: Effect[F],
ec: ExecutionContext): Stream[F, A] = {
type Step = AsyncPull[F, Option[(Segment[A, Unit], Stream[F, A])]]
def readFull(s: Step): Pull[F, A, Unit] =
s.pull.flatMap {
case None => Pull.done
case Some((hd, tl)) => Pull.output(hd) >> tl.pull.echo
}
def go(first: Step, second: Step): Pull[F, A, Unit] =
first.race(second).pull.flatMap {
case Left(Some((hd, first))) => //reading first stream
Pull.output(hd) >> first.pull.unconsAsync.flatMap(go(_, second))
case Left(None) => //first stream has ended, starting to read second
readFull(second)
case Right(Some((hd, second))) => //Second stream has emitted something
hd.uncons1 match {
case Left(_) => //that was empty segment, ignore and continue to pull from both streams
second.pull.unconsAsync.flatMap(go(first, _))
case Right((hd, tl)) => //Ok, stop reading second stream until first is over
readFull(first) >> Pull.output(tl.cons(hd)) >> second.pull.echo
}
case Right(None) => readFull(first)
}
first.pull.unconsAsync
.flatMap(first => second.pull.unconsAsync.flatMap(second => go(first, second)))
.stream
}
//let's test
import ExecutionContext.Implicits._
import scala.concurrent.duration._
Scheduler
.apply[IO](3)
.flatMap { sch =>
val a = sch.sleep_[IO](3.seconds) ++ Stream("a", "b", "c")
.segmentLimit(1)
.flatMap(Stream.segment)
.covary[IO]
.flatMap(s => sch.sleep_[IO](3.seconds) ++ Stream(s))
val b = sch
.awakeEvery[IO](1.seconds)
.take(3)
.flatMap(_ => Stream.eval_(IO(println("--- WORKING...")))) ++
Stream("d", "e", "f").segmentLimit(1).flatMap(Stream.segment).covary[IO] ++ sch
.awakeEvery[IO](1.seconds)
.take(3)
.flatMap(_ => Stream.eval_(IO(println("--- WORKING...")))) ++ Stream("d", "e", "f")
runConcat(a, b)
}
.evalMap(e => IO(println(e)))
.run
.unsafeRunSync()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment