/timeout-balanced.scala Secret
Last active
June 16, 2023 21:57
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* A timeout function that blocks on the cancellation of the given task, | |
* but only for a limited amount of time. This is done to balance | |
* resource safety with liveliness, within the context of Cats-Effect 3's | |
* cancellation model. | |
* | |
* There are possible issues with this implementation, still trying to figure | |
* out the changes coming in Cats-Effect `3.5.0`: | |
* | |
* - [[https://github.com/typelevel/cats-effect/issues/3456]] | |
* - [[https://github.com/typelevel/cats-effect/pull/3453#discussion_r1158048875]] | |
* - [[https://github.com/typelevel/cats-effect/issues/3396]] | |
* | |
* @param duration is the maximum duration the task is allowed to run, | |
* before being cancelled; | |
* @param forgetCancelAwaitAfter is how much we are willing to block on | |
* the cancellation of the timed-out task; | |
*/ | |
def timeoutBalanced[A](duration: FiniteDuration, forgetCancelAwaitAfter: Duration, task: IO[A]): IO[A] = | |
IO.uncancelable { poll => | |
poll(IO.racePair(task, IO.sleep(duration))).flatMap { | |
case Left((outcomeTask, sleepFiber)) => | |
// Task completed before the timeout — the weird `embed` here is needed because the | |
// task might have cancelled itself (e.g., `IO.canceled`), and we need to propagate | |
// the cancellation signal, otherwise it will block forever | |
poll(sleepFiber.cancel *> outcomeTask.embed(poll(IO.canceled) *> IO.never)) | |
case Right((taskFiber, _)) => | |
forgetCancelAwaitAfter match { | |
case zero if zero <= Duration.Zero => | |
// This should be the current behavior of `timeoutAndForget` | |
taskFiber.cancel.start *> IO.raiseError( | |
new TimeoutException( | |
s"Task timed-out after ${duration.toString}, not waiting on the fiber's cancellation" | |
) | |
) | |
case fd: FiniteDuration => | |
taskFiber.cancel.start.flatMap { taskCancelFiber => | |
// We started cancelling the task's fiber; should still be running | |
// on the current, uncancelable fiber | |
poll { | |
IO.race( | |
taskCancelFiber.joinWithUnit, | |
IO.sleep(fd) | |
) | |
}.flatMap { r => | |
val explanation = r match { | |
case Left(_) => | |
"cancellation succeeded" | |
case Right(_) => | |
s"cancellation timed-out after ${fd.toString}" | |
} | |
IO.raiseError( | |
new TimeoutException( | |
s"Task timed-out after ${duration.toString}, $explanation" | |
) | |
) | |
} | |
} | |
case _ /* Duration.Inf */ => | |
// The current behavior of `timeout` | |
taskFiber.cancel *> IO.raiseError( | |
new TimeoutException( | |
s"Task timed-out after ${duration.toString}, cancellation succeeded" | |
) | |
) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment