Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Created September 27, 2019 18:40
Show Gist options
  • Save Daenyth/5cad4e68ed9804ca3c855328f4e0b0ad to your computer and use it in GitHub Desktop.
Save Daenyth/5cad4e68ed9804ca3c855328f4e0b0ad to your computer and use it in GitHub Desktop.
PauseButton for cats-effect / fs2
import java.time.Instant
import cats.effect.concurrent.Deferred
import cats.effect.implicits._
import cats.effect.{Concurrent, Timer, Clock}
import cats.implicits._
import fs2.concurrent.{Signal, SignallingRef}
import scala.concurrent.duration._
/** Encodes "is paused" state along with the ability to set a "paused" state which automatically un-pauses at a specific time */
trait PauseButton[F[_]] {
/** Sets the state to "paused" if `time` is in the future, returning immediately.
*
* The button will automatically become unpaused after `time` unless another call to
* `pauseUntil` specifies a new time farther in the future
*
* The returned `Deferred` will be completed when the button becomes unpaused
*/
def pauseUntil(time: Instant): F[Deferred[F, Unit]]
/** The current pause state */
def isPaused: Signal[F, Boolean]
}
object PauseButton {
def create[F[_]: Concurrent: Timer]: F[PauseButton[F]] =
SignallingRef[F, Option[(Instant, Deferred[F, Unit])]](None)
.map(new PauseButtonImpl[F](_))
private class PauseButtonImpl[F[_]](
signal: SignallingRef[F, Option[(Instant, Deferred[F, Unit])]]
)(implicit F: Concurrent[F], timer: Timer[F])
extends PauseButton[F] {
override def pauseUntil(time: Instant): F[Deferred[F, Unit]] =
getNow.flatMap { now =>
if (now.isAfter(time))
Deferred[F, Unit].flatTap(_.complete(()))
else
signal
.modify[F[Deferred[F, Unit]]] {
case None =>
val gate = Deferred.unsafe[F, Unit]
(time, gate).some -> resetAt(time).as(gate)
case Some((oldTimeout, oldGate)) =>
if (oldTimeout.isBefore(time)) {
val newGate = Deferred.unsafe[F, Unit]
// This completes the old gate when the new gate completes
val closeOldGateLater =
(newGate.get >> oldGate.complete(())).start.void
val next = time -> newGate
val action = resetAt(time) >> closeOldGateLater
next.some -> action.as(newGate)
} else
(oldTimeout, oldGate).some -> oldGate.pure[F]
}
.flatten
}
private def resetAt(time: Instant): F[Unit] =
getNow.flatMap { now =>
val sleep: FiniteDuration = Duration.fromNanos(java.time.Duration.between(now, time).toNanos)
val resetLater =
timer.sleep(sleep) >>
signal
.modify[F[Unit]] {
case Some((resetTime, gate)) if resetTime == time =>
none[(Instant, Deferred[F, Unit])] -> gate.complete(())
case s =>
s -> F.unit
}
.flatten
resetLater.start.void
}
private val getNow =
Clock[F]
.realTime(MILLISECONDS)
.map(Instant.ofEpochMilli)
override val isPaused: Signal[F, Boolean] =
signal.map(_.isDefined)
}
}
import cats.implicits._
import org.scalatest.{AsyncFunSpec, Inspectors, Matchers, Assertion, AsyncTestSuite}
import cats.effect.laws.util.TestContext
import cats.effect._
import org.scalactic.source.Position
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
class PauseButtonSpec
extends AsyncFunSpec with Matchers with TestContextShiftTest with Inspectors {
private implicit val javaInstantOrdering: Order[Instant] = Order.by(t => (t.getEpochSecond, t.getNano))
describe("PauseButton") {
describe("pauseUntil") {
it("returns a Deferred that completes after the time specified") {
for {
now <- instantNow
later = now.plusSeconds(10)
pb <- PauseButton.create[IO]
gate <- pb.pauseUntil(later)
_ <- gate.get
end <- instantNow
} yield assert(end >= later)
}
describe("multiple calls") {
it("extends (sooner, later) calls to (later, later)") {
for {
now <- instantNow
sooner = now.plusSeconds(10)
later = sooner.plusSeconds(10)
pb <- PauseButton.create[IO]
gate <- pb.pauseUntil(sooner)
_ <- pb.pauseUntil(later)
_ <- gate.get
end <- instantNow
} yield {
assert(end >= later)
}
}
it("extends (later, sooner) calls to (later, later)") {
for {
now <- instantNow
sooner = now.plusSeconds(10)
later = sooner.plusSeconds(10)
pb <- PauseButton.create[IO]
_ <- pb.pauseUntil(later)
gate2 <- pb.pauseUntil(sooner)
_ <- gate2.get
end <- instantNow
} yield {
assert(end >= later)
}
}
}
it("doesn't pause when 'until' is in the past") {
for {
startTime <- instantNow
pb <- PauseButton.create[IO]
_ <- timer.sleep(1.second)
gate <- pb.pauseUntil(startTime)
afterPause <- instantNow
_ <- gate.get
afterGate <- instantNow
} yield assert(afterPause == afterGate)
}
}
describe("isPaused") {
it("emits pause status that updates when paused") {
for {
now <- instantNow
later = now.plusSeconds(10)
pb <- PauseButton.create[IO]
p1 <- pb.isPaused.get
gate <- pb.pauseUntil(later)
p2 <- pb.isPaused.get
_ <- gate.get
p3 <- pb.isPaused.get
} yield {
p1 shouldBe false
p2 shouldBe true
p3 shouldBe false
}
}
}
}
private val instantNow = teikametrics.clock.instantNow[IO]
}
trait TestContextShiftTest { this: AsyncTestSuite =>
final protected val ctx = TestContext()
final protected implicit val CS: ContextShift[IO] = ctx.contextShift[IO](IO.ioEffect)
final protected implicit val timer: Timer[IO] = ctx.timer[IO]
final implicit def extremelyUnsafeIOAssertionToFuture(
test: IO[Assertion]
)(implicit pos: Position): Future[Assertion] = {
val result: Future[Assertion] = test.unsafeToFuture()
ctx.tick(1000.day) // Advance the clock
if (result.value.isDefined)
result
else {
fail(
s"""Test probably deadlocked. Test `IO` didn't resolve after simulating 1000 days of time.
| Remaining tasks: ${ctx.state.tasks}""".stripMargin
)(pos)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment