Skip to content

Instantly share code, notes, and snippets.

@ChristopherDavenport
Created January 7, 2022 15:59
Show Gist options
  • Save ChristopherDavenport/f98b6a412696fc4f5328d7da8c1738f4 to your computer and use it in GitHub Desktop.
Save ChristopherDavenport/f98b6a412696fc4f5328d7da8c1738f4 to your computer and use it in GitHub Desktop.
RWDeferred
import cats._
import cats.effect.{Sync, Async}
import cats.effect.kernel.MonadCancel
import cats.~>
// Like Deferred except we don't try to do anything for multiple readers/writers.
// The expectation is that this will ONLY be used to write once
// While one fiber will wait for a callback eventually.
// Anything else results in failure.
final class RWDeferred[F[_]: Async, A] private (
@volatile private[this] var value: A, // A | Null
@volatile private[this] var cb: Either[Throwable, A] => Unit // () => Unit | Null
){
private[this] class GateCont extends cats.effect.Cont[F, A, A]{
def apply[G[_]](implicit G: MonadCancel[G,Throwable]): (Either[Throwable,A] => Unit, G[A], F ~> G) => G[A] = {
case (mcb, gate, _) =>
cb = mcb
val iValue = value
if (iValue == null) gate
else G.pure(iValue)
}
}
private[this] val gate = new GateCont
def complete(a: A): F[Boolean] = Sync[F].delay{
if (value == null){
value = a
val iCb = cb
if (iCb != null) iCb(Right(a))
true
} else false
}
def get: F[A] = Async[F].cont(gate)
def tryGet: F[Option[A]] = Sync[F].delay(Option(value))
}
object RWDeferred {
def apply[F[_]: Async, A]: F[RWDeferred[F, A]] = Sync[F].delay(
new RWDeferred[F, A](null.asInstanceOf[A], null)
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment