-
-
Save alexandru/4e02c5ff340756e7dcc07454d1308324 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.implicits._ | |
import monix.execution.atomic.Atomic | |
import scala.concurrent.{ExecutionContext, Future, Promise} | |
class Cache[A] private (ref: Atomic[Map[String, Promise[A]]]) { | |
def cache(key: String)(task: => Future[A]) | |
(implicit ec: ExecutionContext): Future[A] = { | |
val pullFromCache = ref.transformAndExtract { current => | |
current.get(key) match { | |
case None => | |
val inst = Promise[A]() | |
(Left(inst), current.updated(key, inst)) | |
case Some(value) => | |
(Right(value), current) | |
} | |
} | |
pullFromCache match { | |
case Left(fa) => | |
// actual side-effect | |
task.attempt.flatMap { ea => | |
fa.complete(a) | |
Future.fromTry(ea.toTry) | |
} | |
case Right(fa) => | |
fa.future | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.implicits._ | |
import cats.effect.Concurrent | |
import cats.effect.concurrent.{Deferred, Ref} | |
class Cache[F[_] : Concurrent, A] private (ref: Ref[F, Map[String, F[Deferred[F, Either[Throwable, A]]]]]) { | |
def cache(key: String)(task: F[A]): F[A] = { | |
val pullFromCache = ref.modify { current => | |
current.get(key) match { | |
case None => | |
val inst = Deferred.apply[F, A] | |
(current.updated(key, inst), Left(inst)) | |
case Some(value) => | |
(current, Right(value)) | |
} | |
} | |
// Given the concurrent nature of this caching, | |
// the only reasonable choice to deal with cancelation is to | |
// make it uninterruptible | |
F.uninterruptible { | |
pullFromCache.flatMap { | |
case Left(fa) => | |
for { | |
df <- fa | |
ea <- task.attempt // <- actual side-effect | |
_ <- df.complete(ea) | |
a <- F.rethrow(ea) | |
} yield a | |
case Right(fa) => | |
fa.flatMap(_.get.flatMap(F.rethrow)) | |
} | |
} | |
} | |
} |
I created an alternative (and somewhat bigger) example for the pure cache example above with the following differences:
- Keys are optional and may be of any type
- Optionally allows for setting an expiry
- Adds a little test program that runs the sample
- To avoid the issue of the sample above it does not create new
Deferred
instances insideRef.modify
.
https://gist.github.com/jenshalm/702122b0a9ddadf9e8b1f37638817489
If you neither use the optional keys nor the optional expiry, it is essentially equivalent to Async.memoize.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It seems to me though that the pure version cannot work the way it was written. Apart from several tweaks you need to make to have it compile, the second access for the same key will always fail as the value stored in the Map is not a
Deferred
, but anF[Deferred]
that always creates a new Deferred when run. Therefore line 32 will never complete, as it tries to call.get
on a new Deferred created by thefa
effect which is not thedf
completed in line 27. Confirmed this will a little test that hangs indefinitely as soon as you access any key a second time.