Last active July 20, 2020 08:37
Bubble Errors Up From First Segement
import cats.effect._
import cats.implicits._
import fs2._
object Reconstitute {
// Simplest method I can find at the current moment
def tryFetch1[F[_]: Sync, A](s: Stream[F, A]): F[Stream[F, A]] =
.flatMap {
case None => Pull.output1(Stream.empty)
case Some((hd, tl)) => Pull.output1(Stream.segment(hd) ++ tl)
def headSegmentAndStream[F[_]: Sync, A](s: Stream[F, A]): F[Option[(Segment[A, Unit], Stream[F, A])]] = {
case Some((segment, stream)) => Pull.output1((segment, stream)) >> Pull.done
case None => Pull.done
def bubblesErrorsOnFirstSegment[F[_]: Sync, A](s: Stream[F, A]): F[Stream[F, A]] = {
case (seg, stream) => Stream.segment(seg) ++ stream
// Examples
def shouldReturnHappyStream: IO[Stream[IO, Int]] = bubblesErrorsOnFirstSegment{
(Stream(1) ++ Stream(1)).covary[IO]
def shouldReturnStreamThatWillBlowUpWhenRun: IO[Stream[IO, Int]] = bubblesErrorsOnFirstSegment{
(Stream(1).covary[IO] ++ Stream.raiseError(new Throwable("Boom!")))
def shouldReturnEmptyStream: IO[Stream[IO, Int]] = {
val s : Stream[IO, Int] = Stream.empty.covary[IO]
def shouldFailOuterIO: IO[Stream[IO, Int]] = {
val s : Stream[IO, Int] = Stream.raiseError(new Throwable("Boom!")).covary[IO]
def main(args: Array[String]): Unit = {
.flatMap(s => IO(println(s"Happy Stream Out $s")))
.flatTap(s => IO(println(s"Stream That Will Eventuall Blow Up $s")))
.flatMap(s => s.compile.drain.attempt.flatMap{e => IO(println(s"Stream That Should Blow Up Made $e"))})
.flatTap(s => IO(println(s"Empty Stream Returns Correctly $s")))
.flatTap(s => IO(println(s"Attempted to get Stream, Outer IO Should Fail $s")))
