Last active
November 9, 2016 09:10
-
-
Save sisso/0309e0677da4e60b4044 to your computer and use it in GitHub Desktop.
Spray cache that update values in background. It keep service the old value until it get renewed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package spray.caching | |
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap | |
import spray.util.Timestamp | |
import scala.annotation.tailrec | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.duration.Duration | |
import scala.concurrent.{ExecutionContext, Future, Promise} | |
import scala.util.{Failure, Success, Try} | |
/** | |
* Cache that impl async updates. | |
* | |
* Ignore failures | |
* | |
* @see inspired by https://groups.google.com/forum/#!topic/spray-user/U1c48z4A6A0 | |
* @see spray.caching.ExpiringLruCache | |
*/ | |
final class ExpiringLruCacheWithAsyncUpdate[V](maxCapacity: Long, initialCapacity: Int, | |
timeToLive: Duration, timeToIdle: Duration) extends Cache[V] { | |
require(!timeToLive.isFinite || !timeToIdle.isFinite || timeToLive > timeToIdle, | |
s"timeToLive($timeToLive) must be greater than timeToIdle($timeToIdle)") | |
private[caching] val store = new ConcurrentLinkedHashMap.Builder[Any, Entry2[V]] | |
.initialCapacity(initialCapacity) | |
.maximumWeightedCapacity(maxCapacity) | |
.build() | |
@tailrec | |
def get(key: Any): Option[Future[V]] = store.get(key) match { | |
case null ⇒ None | |
case entry if (isAlive(entry)) ⇒ | |
entry.refresh() | |
Some(entry.future) | |
case entry ⇒ | |
// remove entry, but only if it hasn't been removed and reinserted in the meantime | |
if (store.remove(key, entry)) None // successfully removed | |
else get(key) // nope, try again | |
} | |
def set(key: Any, value: Try[V]): Unit = { | |
val newEntry = new Entry2(Promise[V](), None) | |
newEntry.promise.tryComplete(value) | |
store.put(key, newEntry) | |
} | |
def apply(key: Any, genValue: () ⇒ Future[V])(implicit ec: ExecutionContext): Future[V] = { | |
def insert(lastValue: Option[Future[V]]) = { | |
val newEntry = new Entry2(Promise[V](), lastValue) | |
val valueFuture = | |
store.put(key, newEntry) match { | |
case null ⇒ genValue() | |
case entry ⇒ | |
if (isAlive(entry)) { | |
// we date back the new entry we just inserted | |
// in the meantime someone might have already seen the too fresh timestamp we just put in, | |
// but since the original entry is also still alive this doesn't matter | |
newEntry.created = entry.created | |
entry.future | |
} else genValue() | |
} | |
valueFuture.onComplete { value ⇒ | |
newEntry.promise.tryComplete(value) | |
} | |
newEntry.future | |
} | |
store.get(key) match { | |
case null ⇒ insert(None) | |
case entry if (isAlive(entry)) ⇒ | |
// (entry.future.isCompleted, entry.lastValue) match { | |
// case (false, Some(oldValue)) => | |
// entry.refresh() | |
// oldValue | |
// case _ => | |
// entry.refresh() | |
// entry.future | |
// } | |
entry.refresh() | |
entry.future | |
case entry ⇒ | |
insert(Some(entry.future)) | |
} | |
} | |
def remove(key: Any) = store.remove(key) match { | |
case null ⇒ None | |
case entry if (isAlive(entry)) ⇒ Some(entry.future) | |
case entry ⇒ None | |
} | |
def clear(): Unit = { store.clear() } | |
def keys: Set[Any] = store.keySet().asScala.toSet | |
def ascendingKeys(limit: Option[Int] = None) = | |
limit.map { lim ⇒ store.ascendingKeySetWithLimit(lim) } | |
.getOrElse(store.ascendingKeySet()) | |
.iterator().asScala | |
def size = store.size | |
private def isAlive(entry: Entry2[V]) = | |
(entry.created + timeToLive).isFuture && | |
(entry.lastAccessed + timeToIdle).isFuture | |
} | |
private[caching] class Entry2[T](val promise: Promise[T], val lastValue: Option[Future[T]]) { | |
@volatile var created = Timestamp.now | |
@volatile var lastAccessed = Timestamp.now | |
def future: Future[T] = (promise.future.isCompleted, lastValue) match { | |
case (false, Some(lastValue)) => lastValue | |
case (true, Some(lastValue)) => | |
promise.future.value match { | |
case Some(Success(_)) => promise.future | |
case _ => lastValue | |
} | |
case _ => promise.future | |
} | |
def refresh(): Unit = { | |
// we dont care whether we overwrite a potentially newer value | |
lastAccessed = Timestamp.now | |
} | |
override def toString = future.value match { | |
case Some(Success(value)) ⇒ value.toString | |
case Some(Failure(exception)) ⇒ exception.toString | |
case None ⇒ "pending" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment