Skip to content

Instantly share code, notes, and snippets.

@n4to4
Created April 10, 2018 04:23
Show Gist options
  • Save n4to4/b998ff612c23496d3197cb9d9450ab1d to your computer and use it in GitHub Desktop.
Save n4to4/b998ff612c23496d3197cb9d9450ab1d to your computer and use it in GitHub Desktop.
fs2
object Main extends App {
import cats.effect.IO
import fs2.{Stream, Scheduler}
import fs2.async.signalOf
import fs2.async.mutable.Signal
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
def stopAfter[A](f: FiniteDuration): Stream[IO, A] => Stream[IO, A] = in => {
def close(s: Signal[IO, Boolean]): Stream[IO, Unit] =
Scheduler[IO](2).flatMap { scheduler =>
scheduler.sleep_[IO](f) ++ Stream.eval(s.set(true))
}
Stream.eval(signalOf[IO, Boolean](false)).flatMap { end =>
in.interruptWhen(end).concurrently(close(end))
}
}
val s =
Stream.repeatEval(IO(println("hello")))
.through(stopAfter(1.millis))
s.compile.drain.unsafeRunSync()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment