Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created September 10, 2021 14:11
Show Gist options
  • Save djspiewak/bfed5b1a99f05200314643f466830382 to your computer and use it in GitHub Desktop.
Save djspiewak/bfed5b1a99f05200314643f466830382 to your computer and use it in GitHub Desktop.
/*
* Copyright 2020-2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cats.effect
package testkit
import cats.effect.unsafe.{IORuntime, IORuntimeConfig, Scheduler}
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
/**
* Implements a fully functional single-threaded runtime for [[cats.effect.IO]]. When using the
* [[runtime]] provided by this type, `IO` programs will be executed on a single JVM thread,
* ''similar'' to how they would behave if the production runtime were configured to use a
* single worker thread regardless of underlying physical thread count. Calling one of the
* `unsafeRun` methods on an `IO` will submit it to the runtime for execution, but nothing will
* actually evaluate until one of the ''tick'' methods on this class are called. If the desired
* behavior is to simply run the `IO` fully to completion within the mock environment,
* respecting monotonic time, then [[tickAll]] is likely the desired method.
*
* Where things ''differ'' from the production runtime is in two critical areas.
*
* First, whenever multiple fibers are outstanding and ready to be resumed, the `TestControl`
* runtime will ''randomly'' choose between them, rather than taking them in a first-in,
* first-out fashion as the default production runtime will. This semantic is intended to
* simulate different scheduling interleavings, ensuring that race conditions are not
* accidentally masked by deterministic execution order.
*
* Second, within the context of the `TestControl`, ''time'' is very carefully and artificially
* controlled. In a sense, this runtime behaves as if it is executing on a single CPU which
* performs all operations infinitely fast. Any fibers which are immediately available for
* execution will be executed until no further fibers are available to run (assuming the use of
* `tickAll`). Through this entire process, the current clock (which is exposed to the program
* via [[IO.realTime]] and [[IO.monotonic]]) will remain fixed at the very beginning, meaning
* that no time is considered to have elapsed as a consequence of ''compute''.
*
* Note that the above means that it is relatively easy to create a deadlock on this runtime
* with a program which would not deadlock on either the JVM or JavaScript:
*
* {{{
* // do not do this!
* IO.cede.foreverM.timeout(10.millis)
* }}}
*
* The above program spawns a fiber which yields forever, setting a timeout for 10 milliseconds
* which is ''intended'' to bring the loop to a halt. However, because the immediate task queue
* will never be empty, the test runtime will never advance time, meaning that the 10
* milliseconds will never elapse and the timeout will not be hit. This will manifest as the
* [[tick]] and [[tickAll]] functions simply running forever and not returning if called.
* [[tickOne]] is safe to call on the above program, but it will always return `true`.
*
* In order to advance time, you must use the [[advance]] method to move the clock forward by a
* specified offset (which must be greater than 0). If you use the `tickAll` method, the clock
* will be automatically advanced by the minimum amount necessary to reach the next pending
* task. For example, if the program contains an [[IO.sleep]] for `500.millis`, and there are no
* shorter sleeps, then time will need to be advanced by 500 milliseconds in order to make that
* fiber eligible for execution.
*
* At this point, the process repeats until all tasks are exhausted. If the program has reached
* a concluding value or exception, then it will be produced from the `unsafeRun` method which
* scheduled the `IO` on the runtime (pro tip: do ''not'' use `unsafeRunSync` with this runtime,
* since it will always result in immediate deadlock). If the program does ''not'' produce a
* result but also has no further work to perform (such as a program like [[IO.never]]), then
* `tickAll` will return but no result will have been produced by the `unsafeRun`. If this
* happens, [[isDeadlocked]] will return `true` and the program is in a "hung" state. This same
* situation on the production runtime would have manifested as an asynchronous deadlock.
*
* You should ''never'' use this runtime in a production code path. It is strictly meant for
* testing purposes, particularly testing programs that involve time functions and [[IO.sleep]].
*
* Due to the semantics of this runtime, time will behave entirely consistently with a plausible
* production execution environment provided that you ''never'' observe time via side-effects,
* and exclusively through the [[IO.realTime]], [[IO.monotonic]], and [[IO.sleep]] functions
* (and other functions built on top of these). From the perspective of these functions, all
* computation is infinitely fast, and the only effect which advances time is [[IO.sleep]] (or
* if something external, such as the test harness, calls the [[advance]] method). However, an
* effect such as `IO(System.currentTimeMillis())` will "see through" the illusion, since the
* system clock is unaffected by this runtime. This is one reason why it is important to always
* and exclusively rely on `realTime` and `monotonic`, either directly on `IO` or via the
* typeclass abstractions.
*
* WARNING: ''Never'' use this runtime on programs which use the [[IO#evalOn]] method! The test
* runtime will detect this situation as an asynchronous deadlock.
*
* @see
* [[cats.effect.unsafe.IORuntime]]
* @see
* [[cats.effect.kernel.Clock]]
* @see
* [[tickAll]]
*/
final class TestControl[+A] private (ctx: TestContext, _results: Future[A]) {
val results: IO[Option[Either[Throwable, A]]] =
IO(_results.value.map(_.toEither))
/**
* Returns the minimum time which must elapse for a fiber to become eligible for execution. If
* fibers are currently eligible for execution, the result will be `Duration.Zero`.
*/
val nextInterval: IO[FiniteDuration] =
IO(ctx.nextInterval())
/**
* Advances the runtime clock by the specified amount (which must be positive). Does not
* execute any fibers, though may result in some previously-sleeping fibers to become pending
* and eligible for execution in the next [[tick]].
*/
def advance(time: FiniteDuration): IO[Unit] =
IO(ctx.advance(time))
/**
* A convenience method which advances time by the specified amount and then ticks once. Note
* that this method is very subtle and will often ''not'' do what you think it should. For
* example:
*
* {{{
* // will never print!
* val program = IO.sleep(100.millis) *> IO.println("Hello, World!")
*
* val control = TestControl()
* program.unsafeRunAndForget()(control.runtime)
*
* control.advanceAndTick(1.second)
* }}}
*
* This is very subtle, but the problem is that time is advanced ''before'' the [[IO.sleep]]
* even has a chance to get scheduled! This means that when `sleep` is finally submitted to
* the runtime, it is scheduled for the time offset equal to `1.second + 100.millis`, since
* time was already advanced `1.second` before it had a chance to submit. Of course, time has
* only been advanced by `1.second`, thus the `sleep` never completes and the `println` cannot
* ever run.
*
* There are two possible solutions to this problem: either call [[tick]] ''first'' (before
* calling `advanceAndTick`) to ensure that the `sleep` has a chance to schedule itself, or
* simply use [[tickAll]] if you do not need to run assertions between time windows.
*
* @see
* [[advance]]
* @see
* [[tick]]
*/
def advanceAndTick(time: FiniteDuration): IO[Unit] =
IO(ctx.advanceAndTick(time))
/**
* Executes a single pending fiber and returns immediately. Does not advance time. Returns
* `false` if no fibers are pending.
*/
val tickOne: IO[Boolean] =
IO(ctx.tickOne())
/**
* Executes all pending fibers in a random order, repeating on new tasks enqueued by those
* fibers until all pending fibers have been exhausted. Does not result in the advancement of
* time.
*
* @see
* [[advance]]
* @see
* [[tickAll]]
*/
val tick: IO[Unit] =
IO(ctx.tick())
/**
* Drives the runtime until all fibers have been executed, then advances time until the next
* fiber becomes available (if relevant), and repeats until no further fibers are scheduled.
* Analogous to, though critically not the same as, running an [[IO]] ] on a single-threaded
* production runtime.
*
* This function will terminate for `IO`s which deadlock ''asynchronously'', but any program
* which runs in a loop without fully suspending will cause this function to run indefinitely.
* Also note that any `IO` which interacts with some external asynchronous scheduler (such as
* NIO) will be considered deadlocked for the purposes of this runtime.
*
* @see
* [[tick]]
*/
val tickAll: IO[Unit] =
IO(ctx.tickAll())
/**
* Returns `true` if the runtime has no remaining fibers, sleeping or otherwise, indicating an
* asynchronous deadlock has occurred. Or rather, ''either'' an asynchronous deadlock, or some
* interaction with an external asynchronous scheduler (such as another thread pool).
*/
val isDeadlocked: IO[Boolean] =
IO(ctx.state.tasks.isEmpty)
/**
* Produces the base64-encoded seed which governs the random task interleaving during each
* [[tick]]. This is useful for reproducing test failures which came about due to some
* unexpected (though clearly plausible) execution order.
*/
def seed: String = ctx.seed
}
object TestControl {
/**
* Executes a given [[IO]] under fully mocked runtime control. This is a convenience method
* wrapping the process of creating a new `TestControl` runtime, then using it to evaluate the
* given `IO`, with the `body` in control of the actual ticking process. This method is very
* useful when writing assertions about program state ''between'' clock ticks, and even more
* so when time must be explicitly advanced by set increments. If your assertions are entirely
* intrinsic (within the program) and the test is such that time should advance in an
* automatic fashion, the [[executeFully]] method may be a more convenient option.
*
* The `TestControl` parameter of the `body` provides control over the mock runtime which is
* executing the program. The second parameter produces the ''results'' of the program, if the
* program has completed. If the program has not yet completed, this function will return
* `None`.
*/
def execute[A](
program: IO[A],
config: IORuntimeConfig = IORuntimeConfig(),
seed: Option[String] = None)
: IO[TestControl[A]] =
IO {
val ctx = seed match {
case Some(seed) => TestContext(seed)
case None => TestContext()
}
val runtime: IORuntime = IORuntime(
ctx,
ctx,
new Scheduler {
def sleep(delay: FiniteDuration, task: Runnable): Runnable = {
val cancel = ctx.schedule(delay, task)
() => cancel()
}
def nowMillis() =
ctx.now().toMillis
def monotonicNanos() =
ctx.now().toNanos
},
() => (),
config)
val results = program.unsafeToFuture()(runtime)
new TestControl(ctx, results)
}
/**
* Executes an [[IO]] under fully mocked runtime control, returning the final results. This is
* very similar to calling `unsafeRunSync` on the program, except that the scheduler will use
* a mocked and quantized notion of time, all while executing on a singleton worker thread.
* This can cause some programs to deadlock which would otherwise complete normally, but it
* also allows programs which involve [[IO.sleep]] s of any length to complete almost
* instantly with correct semantics.
*
* @return
* `None` if `program` does not complete, otherwise `Some` of the results.
*/
def executeFully[A](
program: IO[A],
config: IORuntimeConfig = IORuntimeConfig(),
seed: Option[String] = None): IO[Option[Either[Throwable, A]]] =
execute(program, config = config, seed = seed).flatMap(c => c.tickAll *> c.results)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment