Skip to content

Instantly share code, notes, and snippets.

@Mithrandir0x
Created June 26, 2019 09:04
Show Gist options
  • Save Mithrandir0x/a4f178c79e95239e635552036aa4ff68 to your computer and use it in GitHub Desktop.
Save Mithrandir0x/a4f178c79e95239e635552036aa4ff68 to your computer and use it in GitHub Desktop.
import cats.effect.{ContextShift, IO, Timer}
import fs2.concurrent.SignallingRef
import fs2.Stream
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.global
object ConcurrentlySnippet {
def enableSignal(signal: SignallingRef[IO, Boolean]): Stream[IO, Unit] =
Stream.eval(IO { println("Enabling signal") }) >> Stream.eval { signal.set(true) } >> Stream.eval(IO { println("Enabled signal") })
def repeatNumber(n: Long): Stream[IO, Long] =
Stream.emit(n).repeat.covary[IO]
def fail4(t: Throwable): Stream[IO, Long] =
Stream.eval {
IO {
throw t
}
}
def test4_withFatalException()(implicit cs: ContextShift[IO], timer: Timer[IO]): Unit = {
val signal = SignallingRef[IO, Boolean](false).unsafeRunSync()
def toStream: Stream[IO, AnyVal] = {
Stream(
(Stream.sleep(5.seconds) >> fail4(new OutOfMemoryError("Hello World")) >> enableSignal(signal))
.handleErrorWith[IO, Unit](t => Stream.eval(IO { println(t) })),
(repeatNumber(5L).map(println) >> Stream.sleep(1.second))
.interruptWhen(signal)
).parJoin(10)
}
toStream.interruptAfter(10.seconds).compile.drain.unsafeRunSync()
}
def main(args: Array[String]): Unit = {
implicit val cs: ContextShift[IO] = IO.contextShift(global)
implicit val timer: Timer[IO] = IO.timer(global)
val start = System.currentTimeMillis()
test4_withFatalException()
println(System.currentTimeMillis() - start)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment