Skip to content

Instantly share code, notes, and snippets.

@alexandru
Last active May 26, 2019 20:06
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save alexandru/4e02c5ff340756e7dcc07454d1308324 to your computer and use it in GitHub Desktop.
Save alexandru/4e02c5ff340756e7dcc07454d1308324 to your computer and use it in GitHub Desktop.
import cats.implicits._
import monix.execution.atomic.Atomic
import scala.concurrent.{ExecutionContext, Future, Promise}
class Cache[A] private (ref: Atomic[Map[String, Promise[A]]]) {
def cache(key: String)(task: => Future[A])
(implicit ec: ExecutionContext): Future[A] = {
val pullFromCache = ref.transformAndExtract { current =>
current.get(key) match {
case None =>
val inst = Promise[A]()
(Left(inst), current.updated(key, inst))
case Some(value) =>
(Right(value), current)
}
}
pullFromCache match {
case Left(fa) =>
// actual side-effect
task.attempt.flatMap { ea =>
fa.complete(a)
Future.fromTry(ea.toTry)
}
case Right(fa) =>
fa.future
}
}
}
import cats.implicits._
import cats.effect.Concurrent
import cats.effect.concurrent.{Deferred, Ref}
class Cache[F[_] : Concurrent, A] private (ref: Ref[F, Map[String, F[Deferred[F, Either[Throwable, A]]]]]) {
def cache(key: String)(task: F[A]): F[A] = {
val pullFromCache = ref.modify { current =>
current.get(key) match {
case None =>
val inst = Deferred.apply[F, A]
(current.updated(key, inst), Left(inst))
case Some(value) =>
(current, Right(value))
}
}
// Given the concurrent nature of this caching,
// the only reasonable choice to deal with cancelation is to
// make it uninterruptible
F.uninterruptible {
pullFromCache.flatMap {
case Left(fa) =>
for {
df <- fa
ea <- task.attempt // <- actual side-effect
_ <- df.complete(ea)
a <- F.rethrow(ea)
} yield a
case Right(fa) =>
fa.flatMap(_.get.flatMap(F.rethrow))
}
}
}
}
@kubukoz
Copy link

kubukoz commented May 2, 2019

Is pullFromCache.flatMap fiber safe? I imagine cancelation could hit in the meantime and you'd never complete the Deferred. Same if it hits before df.complete in the for comprehension.

@danilbykov
Copy link

It seems you need to handle case when task fails with Exception. In this case Deferred is never completed.

@cultureofone
Copy link

@alexandru
Copy link
Author

@kubukoz no, but given the concurrent nature of this thing, the only solution is to make it uninterruptible

@danilbykov indeed, we need to handle exceptions, that's left as an exercise for the reader

@cultureofone sure, you can work with Coeval.evalOnce in this case as well, but note that it has different behavior in terms of multi-threading ... more than one thread can access that instance at the same time, in which case one of them will block; this might not be a problem of course, depending on your use-case

@alexandru
Copy link
Author

I added error handling and made that task uninterruptible.

@cultureofone
Copy link

@alexandru As I understood from original request author (in Twitter) there is no need for multiple thread support.

@cultureofone
Copy link

@alexandru I added multi-thread version which uses Task.evalOnce - still shorter than your Promise+Future version.
Plus demo which demonstrates that it actually works in parallel:
https://gist.github.com/cultureofone/66810c1609fa02cb5144599476873284

@cultureofone
Copy link

cultureofone commented May 4, 2019

@alexandru I see that in "pure" version Promise+Future logic is changed with Deferred logic.
Although you made things needlessly more complicated for this kind of objective, this is great example of Cats Effect usage and now I finally understand practical need for Cats (Effect) library: it is composable as... something very composable. Only question left is about debug-ability.
Thank You for that ;)

@jenshalm
Copy link

It seems to me though that the pure version cannot work the way it was written. Apart from several tweaks you need to make to have it compile, the second access for the same key will always fail as the value stored in the Map is not a Deferred, but an F[Deferred] that always creates a new Deferred when run. Therefore line 32 will never complete, as it tries to call .get on a new Deferred created by the fa effect which is not the df completed in line 27. Confirmed this will a little test that hangs indefinitely as soon as you access any key a second time.

@jenshalm
Copy link

jenshalm commented May 26, 2019

I created an alternative (and somewhat bigger) example for the pure cache example above with the following differences:

  1. Keys are optional and may be of any type
  2. Optionally allows for setting an expiry
  3. Adds a little test program that runs the sample
  4. To avoid the issue of the sample above it does not create new Deferred instances inside Ref.modify.

https://gist.github.com/jenshalm/702122b0a9ddadf9e8b1f37638817489

If you neither use the optional keys nor the optional expiry, it is essentially equivalent to Async.memoize.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment