Skip to content

Instantly share code, notes, and snippets.

@ChristopherDavenport
Last active October 11, 2021 13:06
Show Gist options
  • Save ChristopherDavenport/8705c8401f05bba36a9bcf69e91f5d5c to your computer and use it in GitHub Desktop.
Save ChristopherDavenport/8705c8401f05bba36a9bcf69e91f5d5c to your computer and use it in GitHub Desktop.
Set Once Cache
import cats._
import cats.implicits._
import cats.effect._
import cats.effect.implicits._
import scala.concurrent.duration._
import io.chrisdavenport.mapref._
trait SetOnceCache[F[_], K, V]{
def get(k: K): F[V]
def delete(k: K): F[Unit]
}
object SetOnceCache {
case class ClearedWhileWaitingException() extends RuntimeException("SetOnceCache evicted while waiting")
case class CanceledWhileWaitingException() extends RuntimeException("SetOnceCache canceled while waiting")
private sealed trait State[F[_], V]
private case class Waiting[F[_], V](identifier: Unique.Token, getWait: Deferred[F, Outcome[F, Throwable, V]]) extends State[F, V]
private case class Done[F[_], V](value: F[V]) extends State[F, V]
private sealed trait Operation[F[_], V] extends Product with Serializable
private case class Set[F[_], V](set: Deferred[F, Outcome[F, Throwable, V]]) extends Operation[F, V]
private case class Wait[F[_], V](getWait: Deferred[F, Outcome[F, Throwable, V]]) extends Operation[F, V]
private case class Go[F[_], V](value: F[V]) extends Operation[F, V]
def impl[F[_]: Async, K, V](f: K => F[V]): F[SetOnceCache[F, K, V]] = MapRef.ofScalaConcurrentTrieMap[F, K, State[F, V]].map(mapref =>
new SetOnceCache[F, K, V]{
def get(k: K): F[V] = (Async[F].unique, Deferred[F, Outcome[F, Throwable, V]]).tupled.flatMap{ case (unique, wait) =>
Async[F].uncancelable{ poll =>
mapref(k).modify[Operation[F, V]]{
case None => (Waiting(unique, wait).some, Set(wait))
case s@Some(Waiting(_, wait)) => (s, Wait(wait))
case s@Some(Done(v)) => (s, Go(v))
}.flatMap{
case Set(set) =>
poll(f(k)).guaranteeCase{
case s@Outcome.Succeeded(fa) =>
mapref(k).modify{
case Some(Waiting(identifier, _)) =>
(Done(fa).some,
if (identifier === unique) set.complete(s).void
else Applicative[F].unit
)
case s@Some(Done(value)) => (s, Applicative[F].unit)
case None => (None, Applicative[F].unit)
}.flatten
case s => mapref(k).modify{
case Some(Waiting(identifier, _)) =>
(None,
if (identifier === unique) set.complete(s).void
else Applicative[F].unit
)
case s@Some(Done(value)) => (s, Applicative[F].unit)
case None => (None, Applicative[F].unit)
}.flatten
}
case Wait(wait) =>
def action = wait.get.flatMap{
case Outcome.Succeeded(fa) => fa
case Outcome.Errored(e) => ApplicativeError[F, Throwable].raiseError[V](e)
case Outcome.Canceled() => ApplicativeError[F, Throwable].raiseError[V](CanceledWhileWaitingException())
}
poll(action)
case Go(v) => poll(v)
}
}
}
def delete(k: K): F[Unit] = Async[F].uncancelable{_ =>
mapref(k).modify{
case Some(Waiting(_, wait)) => (None, wait.complete(Outcome.errored(ClearedWhileWaitingException())).void)
case Some(Done(_)) => (None, Applicative[F].unit)
case None => (None, Applicative[F].unit)
}.flatten
}
}
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment