Skip to content

Instantly share code, notes, and snippets.

@ChristopherDavenport
Last active July 20, 2020 08:37
Show Gist options
  • Save ChristopherDavenport/92f5276ee3011558b37a955e9353121d to your computer and use it in GitHub Desktop.
Save ChristopherDavenport/92f5276ee3011558b37a955e9353121d to your computer and use it in GitHub Desktop.
Bubble Errors Up From First Segement
import cats.effect._
import cats.implicits._
import fs2._
object Reconstitute {
// Simplest method I can find at the current moment
def tryFetch1[F[_]: Sync, A](s: Stream[F, A]): F[Stream[F, A]] =
s.pull.uncons
.flatMap {
case None => Pull.output1(Stream.empty)
case Some((hd, tl)) => Pull.output1(Stream.segment(hd) ++ tl)
}
.stream
.compile
.last
.map(_.getOrElse(Stream.empty))
def headSegmentAndStream[F[_]: Sync, A](s: Stream[F, A]): F[Option[(Segment[A, Unit], Stream[F, A])]] = {
s.pull.uncons.flatMap{
case Some((segment, stream)) => Pull.output1((segment, stream)) >> Pull.done
case None => Pull.done
}.stream.compile.last
}
def bubblesErrorsOnFirstSegment[F[_]: Sync, A](s: Stream[F, A]): F[Stream[F, A]] = {
headSegmentAndStream(s)
.map(
Stream(_).covary[F].unNone.flatMap{
case (seg, stream) => Stream.segment(seg) ++ stream
})
}
// Examples
def shouldReturnHappyStream: IO[Stream[IO, Int]] = bubblesErrorsOnFirstSegment{
(Stream(1) ++ Stream(1)).covary[IO]
}
def shouldReturnStreamThatWillBlowUpWhenRun: IO[Stream[IO, Int]] = bubblesErrorsOnFirstSegment{
(Stream(1).covary[IO] ++ Stream.raiseError(new Throwable("Boom!")))
}
def shouldReturnEmptyStream: IO[Stream[IO, Int]] = {
val s : Stream[IO, Int] = Stream.empty.covary[IO]
bubblesErrorsOnFirstSegment(s)
}
def shouldFailOuterIO: IO[Stream[IO, Int]] = {
val s : Stream[IO, Int] = Stream.raiseError(new Throwable("Boom!")).covary[IO]
bubblesErrorsOnFirstSegment(s)
}
def main(args: Array[String]): Unit = {
shouldReturnHappyStream
.flatMap(s => IO(println(s"Happy Stream Out $s")))
.unsafeRunSync
shouldReturnStreamThatWillBlowUpWhenRun
.flatTap(s => IO(println(s"Stream That Will Eventuall Blow Up $s")))
.flatMap(s => s.compile.drain.attempt.flatMap{e => IO(println(s"Stream That Should Blow Up Made $e"))})
.unsafeRunSync
shouldReturnEmptyStream
.flatTap(s => IO(println(s"Empty Stream Returns Correctly $s")))
.unsafeRunSync
shouldFailOuterIO
.attempt
.flatTap(s => IO(println(s"Attempted to get Stream, Outer IO Should Fail $s")))
.unsafeRunSync
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment