Skip to content

Instantly share code, notes, and snippets.

@swallez
Created September 18, 2018 19:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save swallez/3e2078e2bd925107bffad27bd8ccc072 to your computer and use it in GitHub Desktop.
Save swallez/3e2078e2bd925107bffad27bd8ccc072 to your computer and use it in GitHub Desktop.
A caching wrapper around an async function
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.concurrent.duration._
object CacheUtils {
private case class CacheInfo[T](p: Promise[T], expires: Long) {
/* Return the promise if it's not yet completed or hasn't expired */
def get: Option[Promise[T]] = {
if(!p.isCompleted || System.nanoTime() < expires) Some(p) else None
}
}
/** Caches the result of an async function, refreshing it at most one every `duration` */
def cache[T](duration: Duration)(f: () => Future[T])(implicit ec: ExecutionContext) : () => Future[T] = {
val cached = new AtomicReference[CacheInfo[T]](null)
def refresh(current: CacheInfo[T]): Unit = {
val info = CacheInfo(Promise[T], System.nanoTime() + duration.toNanos)
if (cached.compareAndSet(current, info)) {
f().onComplete(result => info.p.complete(result))
}
}
refresh(cached.get)
() => {
val info = cached.get
info.get match {
case Some(promise) =>
promise.future
case None =>
refresh(info)
cached.get.p.future
}
}
}
// Silly example that shows that we refresh at most once every caching period
def main(args: Array[String]): Unit = {
import ExecutionContext.Implicits._
val changesEvery100ms = cache(100.millis) {
() => Future.successful(System.currentTimeMillis())
}
var value = System.currentTimeMillis()
val end = value + 1000
while(value < end) {
val newValue = Await.result(changesEvery100ms(), Duration.Inf)
if (newValue != value) {
value = newValue
println(value)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment