Skip to content

Instantly share code, notes, and snippets.

@jmcardon
Created May 15, 2018 05:48
Show Gist options
  • Save jmcardon/2eb006ee3ec5b8af56c4a45c83ceec48 to your computer and use it in GitHub Desktop.
Save jmcardon/2eb006ee3ec5b8af56c4a45c83ceec48 to your computer and use it in GitHub Desktop.
import cats.effect.IO
import cats.implicits._
import fs2._
import fs2.async.mutable.Signal
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import java.util.Random
object InterruptExample extends StreamApp[IO] {
case class LeeroyJenkins(quote: String)
val leeroyQuotes = Array("ALRIGHT LET'S DO THIS", "At least I got chicken", "LEEEEROY....JENNNNKINSSSSSSS")
def simulateEmit(s: Scheduler, rand: Random)(implicit ec: ExecutionContext): Stream[IO, LeeroyJenkins] =
s.fixedDelay[IO](1.second).evalMap { _ =>
IO(rand.nextInt(3)).map(i => LeeroyJenkins(leeroyQuotes(i)))
}
def consumeUntil(s: Stream[IO, LeeroyJenkins], shutdownSignal: Signal[IO, Boolean])(
implicit ec: ExecutionContext
): Stream[IO, Unit] =
s.interruptWhen(shutdownSignal).evalMap(l => IO(println(s"Leeroy Jenkins says: ${l.quote}")))
def runMyStuff(stream: Stream[IO, Unit], sig: Signal[IO, Boolean], sc: Scheduler)(
implicit ec: ExecutionContext
): IO[Unit] =
for {
fiber <- stream.compile.drain.start // Fork IO asynchronously so it turns in the background
_ <- sc.effect.sleep[IO](15.seconds)
_ <- sig.set(true)
_ <- fiber.join
} yield ()
//Using global for the sake of the example
import scala.concurrent.ExecutionContext.Implicits.global
def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, StreamApp.ExitCode] =
(for {
scheduler <- Scheduler[IO](1)
signal <- Stream.eval(Signal[IO, Boolean](false))
rand <- Stream.eval(IO(new Random()))
_ <- Stream.eval(runMyStuff(consumeUntil(simulateEmit(scheduler, rand), signal), signal, scheduler))
} yield StreamApp.ExitCode.Success)
.handleError(_ => StreamApp.ExitCode.Error)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment