Skip to content

Instantly share code, notes, and snippets.

@gvolpe
Last active March 20, 2018 07:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gvolpe/517a3ca0965f16773f01399a2e872bea to your computer and use it in GitHub Desktop.
Save gvolpe/517a3ca0965f16773f01399a2e872bea to your computer and use it in GitHub Desktop.
Stream Bracket (fs2)
import cats.effect.IO
import cats.syntax.apply._
import fs2.{Scheduler, Stream, StreamApp}
import fs2.StreamApp.ExitCode
import scala.concurrent.ExecutionContext.Implicits.global
object Streaming extends StreamApp[IO] {
override def stream(args: List[String], requestShutdown: IO[Unit]): fs2.Stream[IO, ExitCode] =
Stream(program("A"), failedProgram("B"), program("C")).join(3).drain ++ Stream.emit(ExitCode.Success)
def putStrLn(value: String): IO[Unit] = IO(println(value))
def acquireResource(name: String): IO[String] =
putStrLn(s"Acquiring resource: $name") *> IO.pure(name)
def program(name: String): Stream[IO, Unit] =
Stream.bracket(acquireResource(name))(
r => Stream.eval(putStrLn(s"Using resource: $r")),
r => putStrLn(s"Closing resource: $r")
)
def failedProgram(name: String): Stream[IO, Unit] =
Stream.bracket(acquireResource(name))(
r => Stream.eval(putStrLn(s"Using resource: $r")).flatMap(_ => Stream.raiseError(new Exception(name))),
r => putStrLn(s"Closing resource: $r")
)
}
/***
* OUTCOME is NOT always the same because of its concurrent nature. One possible outcome is:
Acquiring resource: C
Acquiring resource: B
Using resource: C
Acquiring resource: A
Using resource: B
Using resource: A
Closing resource: A
Closing resource: C
Closing resource: B
java.lang.Exception: B
--- rest of the stack trace
*/
import cats.effect.IO
import cats.syntax.apply._
import fs2.{Scheduler, Stream, StreamApp}
import fs2.StreamApp.ExitCode
object Streaming extends StreamApp[IO] {
override def stream(args: List[String], requestShutdown: IO[Unit]): fs2.Stream[IO, ExitCode] =
Scheduler[IO](2).flatMap { implicit scheduler =>
for {
_ <- program("A")
_ <- program("B")
_ <- program("C")
} yield ()
}.drain
def putStrLn(value: String): IO[Unit] = IO(println(value))
def acquireResource(name: String): IO[String] =
putStrLn(s"Acquiring resource: $name") *> IO.pure(name)
def program(name: String): Stream[IO, Unit] =
Stream.bracket(acquireResource(name))(
r => Stream.eval(putStrLn(s"Using resource: $r")), // Stream.raiseError(new Exception(name))
r => putStrLn(s"Closing resource: $r")
)
}
/***
* OUTCOME is always this:
Acquiring resource: A
Using resource: A
Acquiring resource: B
Using resource: B
Acquiring resource: C
Using resource: C
Closing resource: C
Closing resource: B
Closing resource: A
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment