Skip to content

Instantly share code, notes, and snippets.

@alexandru
Last active June 16, 2023 21:57
/**
* A timeout function that blocks on the cancellation of the given task,
* but only for a limited amount of time. This is done to balance
* resource safety with liveliness, within the context of Cats-Effect 3's
* cancellation model.
*
* There are possible issues with this implementation, still trying to figure
* out the changes coming in Cats-Effect `3.5.0`:
*
* - [[https://github.com/typelevel/cats-effect/issues/3456]]
* - [[https://github.com/typelevel/cats-effect/pull/3453#discussion_r1158048875]]
* - [[https://github.com/typelevel/cats-effect/issues/3396]]
*
* @param duration is the maximum duration the task is allowed to run,
* before being cancelled;
* @param forgetCancelAwaitAfter is how much we are willing to block on
* the cancellation of the timed-out task;
*/
def timeoutBalanced[A](duration: FiniteDuration, forgetCancelAwaitAfter: Duration, task: IO[A]): IO[A] =
IO.uncancelable { poll =>
poll(IO.racePair(task, IO.sleep(duration))).flatMap {
case Left((outcomeTask, sleepFiber)) =>
// Task completed before the timeout — the weird `embed` here is needed because the
// task might have cancelled itself (e.g., `IO.canceled`), and we need to propagate
// the cancellation signal, otherwise it will block forever
poll(sleepFiber.cancel *> outcomeTask.embed(poll(IO.canceled) *> IO.never))
case Right((taskFiber, _)) =>
forgetCancelAwaitAfter match {
case zero if zero <= Duration.Zero =>
// This should be the current behavior of `timeoutAndForget`
taskFiber.cancel.start *> IO.raiseError(
new TimeoutException(
s"Task timed-out after ${duration.toString}, not waiting on the fiber's cancellation"
)
)
case fd: FiniteDuration =>
taskFiber.cancel.start.flatMap { taskCancelFiber =>
// We started cancelling the task's fiber; should still be running
// on the current, uncancelable fiber
poll {
IO.race(
taskCancelFiber.joinWithUnit,
IO.sleep(fd)
)
}.flatMap { r =>
val explanation = r match {
case Left(_) =>
"cancellation succeeded"
case Right(_) =>
s"cancellation timed-out after ${fd.toString}"
}
IO.raiseError(
new TimeoutException(
s"Task timed-out after ${duration.toString}, $explanation"
)
)
}
}
case _ /* Duration.Inf */ =>
// The current behavior of `timeout`
taskFiber.cancel *> IO.raiseError(
new TimeoutException(
s"Task timed-out after ${duration.toString}, cancellation succeeded"
)
)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment