Skip to content

Instantly share code, notes, and snippets.

@ane
Created January 27, 2019 16:07
Show Gist options
  • Save ane/3118d94e610b58a4e47908fb5f66defd to your computer and use it in GitHub Desktop.
Save ane/3118d94e610b58a4e47908fb5f66defd to your computer and use it in GitHub Desktop.
Interruptible Streams using fs2 and cats-effect
import scala.util.Random
import scala.concurrent.duration._
import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits._
import fs2.Stream
import scala.concurrent.ExecutionContext.Implicits.global
import java.time.{Clock, ZoneOffset}
import fs2.concurrent.SignallingRef
object InterruptipleQueue extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val clock = Clock.systemUTC()
def interrupter(signallingRef: SignallingRef[IO, Boolean]): Stream[IO, Unit] = {
for {
_ <- Stream.fixedDelay(1.seconds) // check every 1 second
_ <- Stream.eval {
val t = clock.instant().atOffset(ZoneOffset.UTC)
val on = t.getSecond <= 30 // the on/off logic (e.g. a light switch)
signallingRef.access flatMap {
case (current, set) =>
// need update?
if (on != current) {
if (on) println(s"Seconds of $t <= 30: turning OFF.")
else println(s"Seconds of $t > 30: turning ON.")
set(on)
} else IO.unit
}
}
} yield ()
}
def printer(signallingRef: SignallingRef[IO, Boolean]): Stream[IO, Int] = {
Stream
.iterateEval(0)(i => IO(i + 1))
.metered(100 millis)
.pauseWhen(signallingRef)
}
(for {
signal <- Stream.eval(SignallingRef[IO, Boolean](false))
ints <- printer(signal).concurrently(interrupter(signal))
} yield ints)
.evalMap(i => IO(println(s"Got $i")))
.compile
.drain
.as(ExitCode.Success)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment