Last active March 26, 2024 17:19
CachedResource for cats-effect

Concurrent resource caching for cats


cats-effect Resource is extremely handy for managing the lifecycle of stateful resources, for example database or queue connections. It gives a main interface of:

trait Resource[F[_], A] {
  /** - Acquire resource
    * - Run f
    * - guarantee that if acquire ran, release will run, even if `use` is cancelled or `f` fails
  def use[B](f: A => F[B]): F[B]

It goes beyond even what try-finally gives, in that it guarantees concurrency safety and is aware of cancellation.

There are use cases where use is very awkwardly shaped; for example, the case where we want to use a resource repeatedly for the lifetime of the whole application, but auto-reallocate on certain errors, or auto-reallocate on a timer. Two cases there might be:

  • A rabbit channel. We want one connection open for the whole application, but we don't want to stop/restart the whole app if there's a connection error, we only want to fix the channel.
  • External API clients with a TTL that we want to acquire, reuse, and invalidate at a point in time.


To solve this, I wrote this small interface:

trait CachedResource[F[_], A] {
  def run[B](f: A => F[B]): F[B]
  def invalidate: F[Unit]
  def invalidateIfNeeded(shouldInvalidate: A => Boolean): F[Unit]

It's similar to resource, except that on run completing, instead of releasing immediately, release only when invalidate is called.

Implementation concept

Ref State machine

I added multiple implementations for different use cases, with differing guarantees. Each is implemented using a similar strategy. They model an internal state machine with a cats-effect Ref.

Ref is a functional interface to atomically updated concurrent-safe mutable state. In these classes, I primarily use two methods of its api

trait Ref[F, A] {
  def set(a: A): F[Unit]
  def modify[B](f: A => (A, B)): F[B]

modify uses a compare-and-set strategy internally to guarantee lock-free concurrent modification. When you chose an f that returns (A, F[B]), you can think of it as semantically modeling one step of a state machine; (next state, next action). ref.modify(a => (newA, fAction)).flatten will then update the state and perform the action.

I label this inside the classes as:

def transition[A](f: State => (State, F[A])): F[A] =

Advanced Resource API

Fortunately, cats-effect Resource provides us an advanced api in addition to use:

trait Resource[F[_], A] {
 // ... other methods ...

 /** (resource, release-function). */
 def allocated: F[(A, F[Unit])]

This api is advanced/unsafe because it loses the guaranteed cleanup that Resource provides. It does allow us the flexibility we need to build much more advanced constructs on top of it, however.

Non-concurrent CachedResource

For a very simple implementation using that "Ref as state machine" concept, see SyncCachedResource. It uses SyncIO, which is a wrapped IO that is not allowed to perform concurrency or asynchronous execution.

In this case, we say

// Either empty, or allocated
type State = Option[(R, SyncIO[Unit])]

Here we can use the transition we defined, plus PartialFunction syntax sugar (omitting the argument of a function and using case instead) to get a succinct state machine expression.

// (Some details omitted)

def invalidate: SyncIO[Unit] = transition[Unit] {
  case None =>
    None -> F.unit
  case Some((resource, release)) =>
    None -> release

def run[A](f: R => SyncIO[A]): SyncIO[A] = transition[A] {
  case None =>
    // Like flatMap, but also give us a hook to run on Complete|Canceled|Failure
    None -> resource.allocated.bracketCase {
        case res @ (resource, release) =>
          cache.set(Some(res)) >> f(resource)
      } {
        case ((resource, release), ExitCase.Canceled) => release
        case _                                        => F.unit
  case s @ Some((resource, release)) =>
    // Keep the same state, just perform the action
    s -> f(r)

Concurrent implementation

ConcurrentCachedObject is a concurrent implementation that wraps acquire: F[A] instead of Resource - something that doesn't need cleanup. This allows simpler logic, but we still have a state machine to work with.

// Given Resource[F, R]
type Gate = Deferred[F, Unit] // cats-effect Promise equivalent

sealed trait RState
case object Empty extends RState
case class Ready(r: R) extends RState
case class Pending(gate: Gate) extends RState

One transition in particular to highlight:

// Impurity as a convenience to avoid "just in case" allocations - we can't *run* any `F[A]` inside of `transition`, only return them.
// This is the equivalent of running with scissors, but I promise to be careful
def newGate(): Gate = Deferred.unsafe[F, A]

def run[A](f: R => F[A]): F[A] = transition[A] {
  case Empty =>
    val gate = newGate()
    Pending(gate) -> (runAcquire(gate) >> run(f))

  case s @ Pending(gate) =>
    s -> (gate.get >> run(f)) // wait for gate completion then retry

  case s @ Ready(r) => /* ... */

Pending here is used because the shape of modify is State => (State, data). The guarantee we get from Ref is that the state update will happen atomically and data is returned. We return an action (F[A]) as our data, but the execution of it could be delayed - perhaps another thread gets scheduled before us.

To handle that case, we atomically update the state with our Gate (Deferred)

/** A pure Promise. State is either "complete with `A`" or "empty" */
trait Deferred[F[_], A] {
  /** If empty, semantically block until complete, then return `A`
    * If complete, return `A` immediately
  def get: F[A]
  /** Complete this, and allow any blocked `get` to return with `a`
    * May only be called once ever
  def complete(a: A): F[Unit]

What we get from this combination is:

  • When we are Empty and we run, atomically set the state to Pending
  • Any future run must wait for Pending's gate to return before they are allowed to progress
  • runAcquire is allowed to take as long as it wants. It must handle failure, but that's relatively straightforward.
  • Once the gate is complete, all pending operations will proceed.
  • All those gate.get calls are semantically blocking but not thread blocking. Cats-effect gives us a "green thread" or "M-to-N" thread model, where computations (called Fibers) are run inside JVM (OS) threads, and semantically blocking a Fiber is a asynchronous operation that is cheap and safe.

The big one

ConcurrentCachedResource combines all of the above strategies to get a CachedResource which

  • Guarantees no resources will be leaked - if it's allocated, it's released (as long as someone calls invalidate)
  • Guarantees that at most one resource is allocated at a time
  • Allows any number of run calls concurrently
  • Allows concurrent calls of run and invalidate - run will never observe a closed resource
  • When invalidate is called, semantically blocks invalidate until existing run calls are complete.
  • When invalidate is called, semantically blocks future run calls until invalidate is complete


Special thanks to @SystemFW for his review, advice, and slide deck on Ref+Deferred, which was crucial to my ability to write this.

package teikametrics.effect
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.implicits._
import cats.effect.{Bracket, Concurrent, ExitCase, Resource, Sync, SyncIO}
import cats.implicits._
import scala.util.control.NonFatal
trait CachedResource[F[_], R] extends CachedResource.Runner[F, R] {
/** Invalidates any current instance of `R`, guaranteeing that ???*/
def invalidate: F[Unit]
/** Run `f` with an instance of `R`, possibly allocating a new one, or possibly reusing an existing one.
* Guarantees that `R` will not be invalidated until `f` returns */
def run[A](f: R => F[A]): F[A]
/** Invalidate if `shouldRefresh` returns true, otherwise do nothing */
def invalidateIfNeeded(shouldInvalidate: R => Boolean): F[Unit]
object CachedResource {
/** Run `f` with `get`, and if `f` fails and `shouldInvalidate` returns `true`
* @param shouldInvalidate If true, invalidate. If false or not defined, do not invalidate.
* Default: always invalidate (assuming NonFatal)
def runAndInvalidateOnError[F[_], R, A](cr: CachedResource[F, R])(
f: R => F[A],
shouldInvalidate: PartialFunction[Throwable, Boolean] = {
case NonFatal(_) => true
)(implicit F: Bracket[F, Throwable]): F[A] = {
case ExitCase.Completed | ExitCase.Canceled =>
case ExitCase.Error(e) =>
val willInvalidate = shouldInvalidate.lift(e).getOrElse(false)
/** Runner that checks if refresh is needed before each `run` call, and additionally can invalidate on errors */
def runner[F[_], R, A](cr: CachedResource[F, R])(
shouldRefresh: R => Boolean,
shouldInvalidate: PartialFunction[Throwable, Boolean] = {
case NonFatal(_) => true
implicit F: Bracket[F, Throwable]
): Runner[F, R] = new Runner[F, R] {
override def run[B](f: R => F[B]): F[B] =
for {
_ <- cr.invalidateIfNeeded(shouldRefresh)
b <- {
case ExitCase.Completed | ExitCase.Canceled =>
case ExitCase.Error(e) =>
val willInvalidate = shouldInvalidate.lift(e).getOrElse(false)
} yield b
// NB Runner is exactly the `Codensity` typeclass from haskell
trait Runner[F[_], A] {
def run[B](f: A => F[B]): F[B]
object SyncCachedResource {
def apply[R](resource: Resource[SyncIO, R]): SyncIO[SyncCachedResource[R]] =
SyncIO(new SyncCachedResource(resource))
/** Non-concurrent-safe but simple CachedResource implementation */
class SyncCachedResource[R] private (resource: Resource[SyncIO, R])
extends CachedResource[SyncIO, R] {
// Either empty, or allocated
private type State = Option[(R, SyncIO[Unit])]
override def invalidate: SyncIO[Unit] = transition[Unit] {
case None => None -> unit
case Some((_, release)) =>
None -> release
override def run[A](f: R => SyncIO[A]): SyncIO[A] = transition[A] {
case None =>
empty -> resource.allocated.bracketCase {
case res @ (r, _) =>
cache.set(Some(res)) >> f(r)
} {
case ((_, release), ExitCase.Canceled) => release
case _ => unit
case s @ Some((r, _)) => s -> f(r)
override def invalidateIfNeeded(
shouldInvalidate: R => Boolean): SyncIO[Unit] = transition[Unit] {
case s @ Some((r, release)) =>
if (shouldInvalidate(r)) empty -> release
else s -> unit
case None => empty -> unit
private def transition[A](f: State => (State, SyncIO[A])): SyncIO[A] =
private val empty = Option.empty[(R, SyncIO[Unit])]
private val unit = SyncIO.unit
private val cache: Ref[SyncIO, Option[(R, SyncIO[Unit])]] =
Ref.unsafe[SyncIO, Option[(R, SyncIO[Unit])]](None)
object ConcurrentCachedResource {
def apply[F[_]: Concurrent, R](
resource: Resource[F, R]): F[ConcurrentCachedResource[F, R]] =
Sync[F].delay(new ConcurrentCachedResource(resource))
// private ctor because of Ref.unsafe in class body, `new` needs `F.delay` around it
class ConcurrentCachedResource[F[_], R] private (resource: Resource[F, R])(
implicit F: Concurrent[F]
) extends CachedResource[F, R] {
/** Resource state */
private sealed trait RState
private type Gate = Deferred[F, Unit]
private case object Empty extends RState
private case class Ready(r: R,
release: F[Unit],
running: Int,
pendingInvalidation: Option[Gate])
extends RState
private case class Allocating(gate: Gate) extends RState
private case class Invalidating(gate: Gate) extends RState
override def invalidate: F[Unit] = transition[Unit] {
case s @ Ready(_, release, running, pendingInvalidation) =>
running match {
case 0 =>
val gate = pendingInvalidation.getOrElse(newGate())
Invalidating(gate) -> runRelease(release, gate)
case _ =>
// Jobs in flight - they need to clean up
pendingInvalidation match {
case Some(_) =>
s -> F.unit // could getOrElse for shorter code but prefer to avoid the allocation
case None =>
s.copy(pendingInvalidation = Some(newGate())) -> F.unit
case s @ Invalidating(gate) =>
// We only enter this state when jobs are in-flight - just wait on them to finish us
s -> gate.get
case Empty =>
Empty -> F.unit
case s @ Allocating(gate) =>
// Preserve invariant that `run >> invalidate >> run` acquires resource twice
s -> (gate.get *> invalidate)
override def run[A](f: R => F[A]): F[A] = transition[A] {
case s @ Ready(r, _, running, None) =>
s.copy(running = running + 1) -> f(r).guarantee(runCompleted)
case s @ Ready(_, _, _, Some(gate)) =>
s -> (gate.get >> run(f))
case Empty =>
val gate = newGate()
Allocating(gate) -> (runAllocate(gate) >> run(f))
case s @ Allocating(gate) =>
s -> (gate.get >> run(f))
case s @ Invalidating(gate) =>
s -> (gate.get >> run(f))
private def runCompleted: F[Unit] = transition[Unit] {
case s @ Ready(_, _, running, pendingInvalidation) =>
val stillRunning = running - 1
val action = F.whenA(stillRunning == 0) {
// Our job to trigger the cleanup - `uncancelable` because if that final invalidation gets cancelled, then
// nothing will ever `complete` the `pendingInvalidation` gate, and the whole thing deadlocks
pendingInvalidation.traverse_(_ => invalidate.uncancelable)
s.copy(running = stillRunning) -> action
case other =>
other -> F.raiseError(new IllegalStateException(
s"Tried to complete run when state was $other. This means there is an implementation error in ${this.getClass.getCanonicalName}"))
// Must only be called at the right time, otherwise we could close a resource currently in-use in a `run` call
private def runRelease(release: F[Unit], gate: Gate): F[Unit] =
(release >> cache.set(Empty) >> gate.complete(())).uncancelable
override def invalidateIfNeeded(shouldInvalidate: R => Boolean): F[Unit] =
transition[Unit] {
case s @ Ready(r, _, _, None) =>
s -> F.whenA(shouldInvalidate(r))(invalidate)
case other => other -> F.unit
private def runAllocate(gate: Gate): F[Unit] =
// bracketCase is needed here; allocated.flatMap isn't safe with cancellation
.bracketCase {
case (r, release) =>
(cache.set(Ready(r, release, 0, None)) *> gate.complete(())).uncancelable
} {
case ((_, release), ExitCase.Canceled) =>
// Cancelled between `allocated` and `bracketCase(use)`
runRelease(release, gate)
case _ => F.unit
.onError {
case NonFatal(e) =>
// On allocation error, reset the cache and notify anyone that started waiting while we were in `Allocating`
runRelease(F.unit, gate)
// Using `unsafe` just so that I can have RState be an inner type, to avoid useless type parameters on RState
private val cache = Ref.unsafe[F, RState](Empty)
// empty parens to disambiguate the overload
private def transition[A](f: RState => (RState, F[A])): F[A] =
// `unsafe` because the effect being run is just a `new AtomicRef...`, and most cases where we might need it,
// we don't need it, so don't force `flatMap` to get it ready "just in case"
private def newGate(): Gate = Deferred.unsafe[F, Unit]
object ConcurrentCachedObject {
def apply[F[_]: Concurrent, R](acquire: F[R]): F[CachedResource[F, R]] =
Sync[F].delay(new ConcurrentCachedObject[F, R](acquire))
// private ctor because of Ref.unsafe in class body, `new` needs `F.delay` around it
class ConcurrentCachedObject[F[_], R] private (acquire: F[R])(
implicit F: Concurrent[F]
) extends CachedResource[F, R] {
/** Resource state */
private sealed trait RState
private type Gate = Deferred[F, Unit]
private case object Empty extends RState
private case class Ready(r: R) extends RState
private case class Pending(gate: Gate) extends RState
override def invalidate: F[Unit] =
transition[Unit](_ => Empty -> F.unit)
override def run[A](f: R => F[A]): F[A] = transition[A] {
case s @ Ready(r) =>
s -> f(r)
case Empty =>
val gate = newGate()
Pending(gate) -> (runAcquire(gate) >> run(f))
case s @ Pending(gate) =>
s -> (gate.get >> run(f))
override def invalidateIfNeeded(shouldInvalidate: R => Boolean): F[Unit] =
transition[Unit] {
case s @ Ready(r) =>
s -> F.whenA(shouldInvalidate(r))(invalidate)
case other =>
other -> F.unit
private def runAcquire(gate: Gate): F[Unit] =
.flatMap { r =>
(cache.set(Ready(r)) *> gate.complete(())).uncancelable
.onError {
case NonFatal(_) =>
// On allocation error, reset the cache and notify anyone that started waiting while we were in `Allocating`
private def setEmpty(gate: Gate): F[Unit] =
(cache.set(Empty) >> gate.complete(())).uncancelable
// Using `unsafe` just so that I can have RState be an inner type, to avoid useless type parameters on RState
private val cache = Ref.unsafe[F, RState](Empty)
// empty parens to disambiguate the overload
private def transition[A](f: RState => (RState, F[A])): F[A] =
// `unsafe` because the effect being run is just a `new AtomicRef...`, and most cases where we might need it,
// we don't need it, so don't force `flatMap` to get it ready "just in case"
private def newGate(): Gate = Deferred.unsafe[F, Unit]
package teikametrics.effect
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.laws.util.TestContext
import cats.effect.{ContextShift, IO, Resource, Sync, SyncIO, Timer}
import cats.implicits._
import cats.{Applicative, ApplicativeError, FlatMap}
import fs2.Stream
import org.scalactic.source.Position
import org.scalatest.words.ResultOfStringPassedToVerb
import org.scalatest.{Assertion, AsyncFlatSpec, Inspectors, Matchers}
import teikametrics.RefLogger
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Random
import scala.util.control.NoStackTrace
class ConcurrentCachedResourceSpec
extends AsyncFlatSpec with ConcurrentCachedResourceBehavior {
"ConcurrentCachedResource" should behave like cachedResource(create)
"ConcurrentCachedResource" should behave like concurrentCachedResource(create)
"ConcurrentCachedResource (slow acquire)" should behave like concurrentCachedResource(
slowResource(time, 0.milli))
"ConcurrentCachedResource (slow release)" should behave like concurrentCachedResource(
slowResource(0.milli, time))
"ConcurrentCachedResource (slow acquire & release)" should behave like concurrentCachedResource(
slowResource(time, time))
"failing to acquire" should "not deadlock runs" inIO {
// TODO copy this test for behave like cachedResource
for {
(_, res, allocOk, _) <- unreliableResource
cr <- ConcurrentCachedResource(res)
_ <- allocOk.set(false)
run1Result <- => IO.unit).attempt
_ <- allocOk.set(true)
_ <-[IO]).timeout(1.milli)
} yield run1Result shouldBe Left(FailAlloc)
"failing to acquire" should "not deadlock invalidation" inIO {
// TODO copy this test for behave like cachedResource
for {
(_, res, allocOk, _) <- unreliableResource
cr <- ConcurrentCachedResource(res)
_ <- allocOk.set(false)
run1Result <- => IO.unit).attempt
_ <- allocOk.set(true)
_ <- cr.invalidate.timeout(1.milli)
} yield run1Result shouldBe Left(FailAlloc)
"failing to acquire (slowly)" should "not deadlock runs" inIO {
for {
(_, res, allocOk, sleeper) <- unreliableResource
cr <- ConcurrentCachedResource(res)
_ <- allocOk.set(false)
_ <- sleeper.set(Some(3.nano))
run1 <- => IO.unit).attempt.start
_ <- timer.sleep(time) // less than alloc timeout, but long enough that we are sure `.start` has begun
_ <- allocOk.set(true)
_ <- sleeper.set(None)
_ <-[IO]).timeout(1.milli)
run1Result <- run1.join
} yield run1Result shouldBe Left(FailAlloc)
"failing to acquire (slowly)" should "not deadlock invalidate" inIO {
for {
(_, res, allocOk, sleeper) <- unreliableResource
cr <- ConcurrentCachedResource(res)
_ <- allocOk.set(false)
_ <- sleeper.set(Some(3.nano))
run1 <- => IO.unit).attempt.start
_ <- timer.sleep(time) // less than alloc timeout, but long enough that we are sure `.start` has begun
_ <- cr.invalidate.timeout(1.milli)
run1Result <- run1.join
} yield run1Result shouldBe Left(FailAlloc)
def slowResource(
acquireSleep: FiniteDuration,
releaseSleep: FiniteDuration
): IO[(Pool, CachedResource[IO, Obj])] =
for {
pool <- Ref[IO].of(Map.empty[Int, Obj])
ids <- Ref[IO].of(1)
res = Resource.make(
timer.sleep(acquireSleep).uncancelable *> Resources.alloc(ids, pool)) {
obj =>
(timer.sleep(releaseSleep).uncancelable *> Resources
cr <- ConcurrentCachedResource(res)
} yield pool -> cr
case object FailAlloc
extends Exception("Failed resource allocate") with NoStackTrace
def unreliableResource: IO[(Pool,
Resource[IO, Obj],
Ref[IO, Boolean],
Ref[IO, Option[FiniteDuration]])] =
for {
pool <- Ref[IO].of(Map.empty[Int, Obj])
ids <- Ref[IO].of(1)
passer <- Ref[IO].of(true)
sleeper <- Ref[IO].of(Option.empty[FiniteDuration])
} yield {
val alloc = for {
pass <- passer.get
sleep <- sleeper.get
_ <- sleep.traverse_(timer.sleep)
obj <- if (pass) Resources.alloc(ids, pool)
else IO.raiseError(FailAlloc)
} yield obj
val resource = Resource.make(alloc) { obj =>
(pool, resource, passer, sleeper)
def create: IO[(Pool, CachedResource[IO, Obj])] =
for {
(pool, res) <- Resources.basic
cr <- ConcurrentCachedResource(res)
} yield (pool, cr)
class SyncCachedResourceSpec
extends AsyncFlatSpec with CachedResourceBehavior[SyncIO] {
// works with AsyncTestSuite serialExecutionContext
"SyncCachedResource" should behave like cachedResource(create)
def create =
for {
(pool, res) <- Resources.basic
cr <- SyncCachedResource(res)
} yield (pool, cr)
override protected def toFuture(fa: SyncIO[Assertion])(
implicit pos: Position): Future[Assertion] =
class ConcurrentCachedObjectSpec
extends AsyncFlatSpec with ConcurrentCachedResourceBehavior {
behavior of "ConcurrentCachedObject"
it should behave like cachedResource(create, withCleanup = false)
it should behave like concurrentCachedResource(create, withCleanup = false)
def create: IO[(Pool, CachedResource[IO, Obj])] =
for {
pool <- Ref[IO].of(Map.empty[Int, Obj])
ids <- Ref[IO].of(1)
res = Resources.alloc(ids, pool)
cr <- ConcurrentCachedObject(res)
} yield (pool, cr)
class Obj(val id: Int) {
private var _alive: Boolean = true
def alive: Boolean = synchronized(_alive)
def unsafeRelease(): Unit = synchronized(_alive = false)
def assertLive[F[_]](implicit F: ApplicativeError[F, Throwable]): F[Unit] =
F.raiseError(new Exception(s"Obj ${} is dead")))
override def equals(obj: Any): Boolean = obj match {
case that: Obj => this eq that
case _ => false
override def hashCode(): Int = id.hashCode()
override def toString: String = s"Obj($id alive=$alive)"
trait BaseCachedResourceBehavior[F[_]] extends Matchers with Inspectors {
this: AsyncFlatSpec =>
protected val time = 1.nano
protected type Pool = Ref[F, Map[Int, Obj]]
protected object Resources {
def alloc(ids: Ref[F, Int], pool: Pool)(implicit F: FlatMap[F]): F[Obj] =
ids.modify(cur => (cur + 1, cur)).flatMap { id =>
pool.modify { m =>
val obj = new Obj(id)
m.updated(, obj) -> obj
def basicRelease(pool: Pool)(implicit F: Sync[F]): Obj => F[Unit] =
obj => F.delay(obj.unsafeRelease())
def basic(implicit F: Sync[F]): F[(Pool, Resource[F, Obj])] =
for {
pool <- Ref[F].of(Map.empty[Int, Obj])
ids <- Ref[F].of(1)
} yield pool -> Resource.make(alloc(ids, pool))(basicRelease(pool))
protected def toFuture(fa: F[Assertion])(
implicit pos: Position): Future[Assertion]
protected implicit class ItVerbStringOps(itVerbString: ItVerbString) {
def inIO(testFun: => F[Assertion])(implicit pos: Position): Unit =
protected implicit class ResultOfStringPassedToVerbOps(
obj: ResultOfStringPassedToVerb) {
def inIO(testFun: => F[Assertion])(implicit pos: Position): Unit =
trait CachedResourceBehavior[F[_]] extends BaseCachedResourceBehavior[F] {
this: AsyncFlatSpec =>
/** should behave like cachedResource(create) */
protected def cachedResource(
create: F[(Pool, CachedResource[F, Obj])],
withCleanup: Boolean = true // Whether or not this resource is expected to clean up Obj on invalidate
)(implicit F: Sync[F]): Unit = {
it should "run with no previous state" inIO {
for {
(_, cr) <- create
_ <-[F])
} yield succeed
it should "invalidate with no previous state" inIO {
for {
(_, cr) <- create
_ <- cr.invalidate
} yield succeed
it should "run and then invalidate" inIO {
for {
(pool, cr) <- create
id <- => r.assertLive[F].as(
_ <- cr.invalidate
obj <-
} yield {
if (withCleanup)
obj.get.alive shouldEqual false
else succeed
it should "reuse for multiple runs" inIO {
for {
(_, cr) <- create
id1 <- => r.assertLive[F].as(
id2 <- => r.assertLive[F].as(
} yield id1 shouldEqual id2
it should "get a new resource after invalidating" inIO {
for {
(_, cr) <- create
id1 <- => r.assertLive[F].as(
_ <- cr.invalidate
id2 <- => r.assertLive[F].as(
} yield id1 should not equal id2
it should "allow run to fail and still work after" inIO {
for {
(_, cr) <- create
oops = new Exception("oops")
result <- => F.raiseError[Int](oops)).attempt
alive <-[F])
} yield {
result shouldEqual Left(oops)
alive shouldBe true
trait ConcurrentCachedResourceBehavior extends CachedResourceBehavior[IO] {
this: AsyncFlatSpec =>
implicit val ctx: TestContext = TestContext()
// Explicitly pass Async[IO] because sometimes scalac wants to compile
// to 'ctx.contextShift(IO.ioConcurrentEffect(this.CS))`, which is recursive, and explodes.
// And yes, "sometimes", because implicit resolution is slightly nondeterministic
implicit val CS: ContextShift[IO] = ctx.contextShift[IO](IO.ioEffect)
implicit val timer: Timer[IO] = ctx.timer[IO]
def concurrentCachedResource(
create: IO[(Pool, CachedResource[IO, Obj])],
withCleanup: Boolean = true
): Unit = {
// Alias here so I can move code between the traits easier
type F[A] = IO[A]
it should "get a new resource after invalidating (concurrently)" inIO {
for {
(_, cr) <- create
id1 <- => timer.sleep(time) *> r.assertLive[F].as(
_ <- cr.invalidate
id2 <- => r.assertLive[F].as(
} yield id1 should not equal id2
it should "reuse resource when starting a run while one run is in progress" inIO {
for {
(_, cr) <- create
gate <- Deferred[F, Unit]
// use gate so run can't complete until after another concurrent run starts
run1 <- =>
id2 <- => gate.complete(()).as(
id1 <- run1.join
} yield id1 shouldEqual id2
it should "defer releasing for invalidate until in flight run completes" inIO {
for {
(_, cr) <- create
run <- => timer.sleep(time) *> r.assertLive[F]).start
_ <- cr.invalidate
_ <- run.join
} yield succeed
it should "race run and invalidate without failing or leaking" inIO {
for {
(pool, cr) <- create
_ <- => IO.unit) // warmup allocate
parLimit = 8 // arbitrary
tasks = 100 // arbitrary
results <- Stream( => timer.sleep( *> r.assertLive[F]).attempt,
.mapAsyncUnordered(parLimit)(io => io)
_ <- cr.invalidate // Make sure the last task is to invalidate
objects <- pool.get
} yield {
all(results) shouldBe 'right
if (withCleanup) {
forAll(objects.values) { obj =>
obj.alive shouldBe false
val numAllocated = objects.keySet.max
val maxAllocated = tasks / 2 // div by 2 because half are run, half invalidate
numAllocated should be <= maxAllocated
it should "not leak or deadlock under aggressive cancellation and concurrency" inIO {
sealed abstract class Task {
def id: Int
def run(cr: CachedResource[F, Obj]): F[Unit]
case class Sleep(id: Int, dur: Int) extends Task {
def run(cr: CachedResource[F, Obj]): F[Unit] = => timer.sleep(dur.nanos) *> r.assertLive[F])
case object Ex extends Exception("ok") with NoStackTrace
case class Err(id: Int) extends Task {
val err: F[Unit] = IO.raiseError(Ex)
def run(cr: CachedResource[F, Obj]): F[Unit] = => err).recoverWith {
case Ex => IO.unit
case class Invalidate(id: Int) extends Task {
def run(cr: CachedResource[F, Obj]): F[Unit] =
val taskCount = 1000
for {
log <- RefLogger.withTime[F]
(pool, cr) <- create
rand = IO(Random.nextInt(5)) // 0 to 4 inclusive
bool = IO(Random.nextBoolean())
ids = Stream.iterate(0)(_ + 1)
tasks: Stream[F, (String, Either[Throwable, Unit])] = Stream[
Int => F[Task]
i => => Sleep(i, dur)),
i => (Err(i): Task).pure[F],
i => (Invalidate(i): Task).pure[F],
.zipWith(ids) { case (mkTask, i) => mkTask(i) }
.mapAsyncUnordered(taskCount) { t: Task =>
for {
_ <-"Start $t")
f <-
} yield (t, f)
} // concurrent .start in non-deterministic order
.mapAsyncUnordered(taskCount) {
case (t, f) =>
for {
_ <-"End $t")
// Timeout will only fail if we deadlocked
e <- Sync[F].ifM(bool)(f.cancel.attempt,
_ <-" Ended $t: $e")
} yield t.toString -> e
} // Cancel/join in non-deterministic order
results <- tasks.compile.toVector
_ <- cr.invalidate
objects <- pool.get
logData <- log.history // unused but available for debugger exploration
} yield {
val logLines = logData.toVector // unused, but in scope so it's visible in the debugger
if (withCleanup) { forAll(objects.values)(_.alive shouldBe false) }
results.foreach {
case (taskId, result) =>
withClue(taskId) {
result shouldBe Right(())
final override protected def toFuture(fa: IO[Assertion])(
implicit pos: Position): Future[Assertion] = {
val year =
val test = fa
new Exception(
"Test case did not complete within 1 year. Deadlock is likely"))
.unsafeToFuture() // Begin eager test execution async
// Resolve `IO` concurrency inside `test` by advancing the clock
ctx.tick(1.minute) // Definitely past our `timeoutTo`
val tasksAfterTick = ctx.state.tasks
if (tasksAfterTick.isEmpty) {
test // Now that `ctx` has no remaining `IO` to run, return the (completed) `Future[Assertion]`
} else {
// timeoutTo wasn't enough, maybe we deadlocked `uncancelable` IO?.
// `Future` has no ability to cancel, so hopefully it gets GC'd
throw new IllegalStateException(
s"""Test probably deadlocked.
| tasksAfterTick=$tasksAfterTick
| pos=$pos""".stripMargin
Here is the adapted code to support key-value cached object.
I am not satisfied with the signatures, any recommendation ?

// Credit to Gavin Bisesi and his wonderful implementation at:

// It's actually not a resource, just an effectful constructor
trait MultiCachedResource[F[_], K, R] {
  def run[A](k: K)(f: R => F[A]): F[Option[A]]

object ConcurrentMultiCachedObject {
  def apply[F[_]: Concurrent, K, R](acquire: K => F[Option[R]]): F[MultiCachedResource[F, K, R]] =
    Sync[F].delay(new ConcurrentMultiCachedObject[F, K, R](acquire))

class ConcurrentMultiCachedObject[F[_]: Concurrent, K, R](acquire: K => F[Option[R]]) extends MultiCachedResource[F, K, R] {

  private type MRState = Map[K, Option[RState]]
  private val cache = Ref.unsafe[F, MRState](Map.empty[K, Option[RState]])

  private def transition[A](f: MRState => (MRState, F[A])): F[A] =

  private type Gate = Deferred[F, Unit]
  private def newGate(): Gate = Deferred.unsafe[F, Unit]

  private sealed trait RState
  private case class Ready(r: R) extends RState
  private case class Pending(gate: Gate) extends RState

  override def run[A](k: K)(f: R => F[A]): F[Option[A]] = transition[Option[A]](map => map.get(k) match {
    case Some(v) => v match {
      case Some(Ready(r)) =>
        map -> f(r).map(Some(_))

      case Some(Pending(gate: Gate)) =>
        map -> (gate.get >> run(k)(f))

      case None =>
        map -> none.pure[F]

    case None =>
      val gate = newGate()
      map + (k -> Pending(gate).some) -> (runAcquire(k)(gate) >> run(k)(f))

  private def runAcquire(k: K)(gate: Gate): F[Unit] =
    acquire(k).flatMap { maybeR =>
      (cache.update(_ + (k -> *> gate.complete(())).uncancelable
    }.onError {
      case NonFatal(_) =>
        (cache.update(_ - k) >> gate.complete(())).uncancelable

Daenyth commented Nov 9, 2020

All code in my post is available free to use under the MIT open source license

