Skip to content

Instantly share code, notes, and snippets.

@sisso
Last active November 9, 2016 09:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sisso/0309e0677da4e60b4044 to your computer and use it in GitHub Desktop.
Save sisso/0309e0677da4e60b4044 to your computer and use it in GitHub Desktop.
Spray cache that update values in background. It keep service the old value until it get renewed
package spray.caching
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap
import spray.util.Timestamp
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}
/**
* Cache that impl async updates.
*
* Ignore failures
*
* @see inspired by https://groups.google.com/forum/#!topic/spray-user/U1c48z4A6A0
* @see spray.caching.ExpiringLruCache
*/
final class ExpiringLruCacheWithAsyncUpdate[V](maxCapacity: Long, initialCapacity: Int,
timeToLive: Duration, timeToIdle: Duration) extends Cache[V] {
require(!timeToLive.isFinite || !timeToIdle.isFinite || timeToLive > timeToIdle,
s"timeToLive($timeToLive) must be greater than timeToIdle($timeToIdle)")
private[caching] val store = new ConcurrentLinkedHashMap.Builder[Any, Entry2[V]]
.initialCapacity(initialCapacity)
.maximumWeightedCapacity(maxCapacity)
.build()
@tailrec
def get(key: Any): Option[Future[V]] = store.get(key) match {
case null ⇒ None
case entry if (isAlive(entry)) ⇒
entry.refresh()
Some(entry.future)
case entry ⇒
// remove entry, but only if it hasn't been removed and reinserted in the meantime
if (store.remove(key, entry)) None // successfully removed
else get(key) // nope, try again
}
def set(key: Any, value: Try[V]): Unit = {
val newEntry = new Entry2(Promise[V](), None)
newEntry.promise.tryComplete(value)
store.put(key, newEntry)
}
def apply(key: Any, genValue: () ⇒ Future[V])(implicit ec: ExecutionContext): Future[V] = {
def insert(lastValue: Option[Future[V]]) = {
val newEntry = new Entry2(Promise[V](), lastValue)
val valueFuture =
store.put(key, newEntry) match {
case null ⇒ genValue()
case entry ⇒
if (isAlive(entry)) {
// we date back the new entry we just inserted
// in the meantime someone might have already seen the too fresh timestamp we just put in,
// but since the original entry is also still alive this doesn't matter
newEntry.created = entry.created
entry.future
} else genValue()
}
valueFuture.onComplete { value ⇒
newEntry.promise.tryComplete(value)
}
newEntry.future
}
store.get(key) match {
case null ⇒ insert(None)
case entry if (isAlive(entry)) ⇒
// (entry.future.isCompleted, entry.lastValue) match {
// case (false, Some(oldValue)) =>
// entry.refresh()
// oldValue
// case _ =>
// entry.refresh()
// entry.future
// }
entry.refresh()
entry.future
case entry ⇒
insert(Some(entry.future))
}
}
def remove(key: Any) = store.remove(key) match {
case null ⇒ None
case entry if (isAlive(entry)) ⇒ Some(entry.future)
case entry ⇒ None
}
def clear(): Unit = { store.clear() }
def keys: Set[Any] = store.keySet().asScala.toSet
def ascendingKeys(limit: Option[Int] = None) =
limit.map { lim ⇒ store.ascendingKeySetWithLimit(lim) }
.getOrElse(store.ascendingKeySet())
.iterator().asScala
def size = store.size
private def isAlive(entry: Entry2[V]) =
(entry.created + timeToLive).isFuture &&
(entry.lastAccessed + timeToIdle).isFuture
}
private[caching] class Entry2[T](val promise: Promise[T], val lastValue: Option[Future[T]]) {
@volatile var created = Timestamp.now
@volatile var lastAccessed = Timestamp.now
def future: Future[T] = (promise.future.isCompleted, lastValue) match {
case (false, Some(lastValue)) => lastValue
case (true, Some(lastValue)) =>
promise.future.value match {
case Some(Success(_)) => promise.future
case _ => lastValue
}
case _ => promise.future
}
def refresh(): Unit = {
// we dont care whether we overwrite a potentially newer value
lastAccessed = Timestamp.now
}
override def toString = future.value match {
case Some(Success(value)) ⇒ value.toString
case Some(Failure(exception)) ⇒ exception.toString
case None ⇒ "pending"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment