Created
May 25, 2012 16:49
-
-
Save 9len/2789151 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.servo.repository | |
import com.twitter.conversions.time._ | |
import com.twitter.servo.cache._ | |
import com.twitter.logging.Logger | |
import com.twitter.util.{Duration, Future, Return, Time, Throw, Try} | |
import scala.collection.mutable | |
import scala.util.Random | |
/** | |
* A set of classes representing the various states for a cached result. | |
*/ | |
sealed abstract class CachedResult[+K, +V] { | |
def key: K | |
} | |
object CachedResult { | |
import CachedResultAction._ | |
import CachingKeyValueRepository._ | |
/** Indicates the key was not in cache */ | |
case class NotFound[K](key: K) extends CachedResult[K, Nothing] | |
/** Indicates there was an error fetching the key */ | |
case class Failed[K](key: K, t: Throwable) extends CachedResult[K, Nothing] | |
/** Indicates the cached value could not be deserialized */ | |
case class DeserializationFailed[K](key: K) extends CachedResult[K, Nothing] | |
/** Indicates the cached value could not be serialized */ | |
case class SerializationFailed[K](key: K) extends CachedResult[K, Nothing] | |
/** Indicates that a NotFound tombstone was found in cached */ | |
case class CachedNotFound[K](key: K, cachedAt: Time) extends CachedResult[K, Nothing] | |
/** Indicates that a Deleted tombstone was found in cached */ | |
case class CachedDeleted[K](key: K, cachedAt: Time) extends CachedResult[K, Nothing] | |
/** Indicates that value was found in cached */ | |
case class CachedFound[K, V](key: K, value: V, cachedAt: Time) extends CachedResult[K, V] | |
type Handler[K, V] = CachedResult[K, V] => CachedResultAction[V] | |
type PartialHandler[K, V] = CachedResult[K, V] => Option[CachedResultAction[V]] | |
type HandlerFactory[Q <: Seq[K], K, V] = Q => Handler[K, V] | |
val PartialHandler = new { | |
/** | |
* Sugar to produce a PartialHandler from a PartialFunction. Successive calls to | |
* isDefined MUST return the same result. Otherwise, take the syntax hit and wire | |
* up your own PartialHandler. | |
*/ | |
def apply[K, V]( | |
partial: PartialFunction[CachedResult[K, V], CachedResultAction[V]] | |
): PartialHandler[K, V] = partial.lift(_) | |
/** | |
* compose one PartialHandler with another to produce a new PartialHandler | |
*/ | |
def compose[K, V]( | |
thisHandler: PartialHandler[K, V], | |
thatHandler: PartialHandler[K, V] | |
): PartialHandler[K, V] = { (cachedResult) => | |
thisHandler(cachedResult) orElse thatHandler(cachedResult) | |
} | |
/** | |
* terminate a PartialHandler to produce a new Handler | |
*/ | |
def terminate[K, V]( | |
partial: PartialHandler[K, V], | |
handler: Handler[K, V] = defaultHandler[K, V] | |
): Handler[K, V] = { (cachedResult) => | |
partial(cachedResult).getOrElse(handler(cachedResult)) | |
} | |
/** | |
* lift a PartialFunction to a PartialHandler and terminate | |
*/ | |
def terminate[K, V]( | |
partial: PartialFunction[CachedResult[K, V], CachedResultAction[V]], | |
handler: Handler[K, V] = defaultHandler[K, V] | |
): Handler[K, V] = terminate(apply(partial), handler) | |
} | |
def defaultHandlerFactory[Q <: Seq[K], K, V]: HandlerFactory[Q, K, V] = _ => defaultHandler[K, V] | |
/** | |
* This is the default Handler. Failures are treated as misses. | |
*/ | |
def defaultHandler[K, V]: Handler[K, V] = { (cachedResult) => | |
cachedResult match { | |
case NotFound(_) | Failed(_, _) => HandleAsMiss | |
case DeserializationFailed(_) | SerializationFailed(_) => HandleAsMiss | |
case CachedNotFound(_, _) | CachedDeleted(_, _) => HandleAsNotFound | |
case CachedFound(_, value, _) => HandleAsFound(value) | |
} | |
} | |
/** | |
* A ParitalHandler that bubbles memcache failures up instead of converting | |
* those failures to misses. | |
*/ | |
def failuresAreFailures[K, V] = PartialHandler[K, V] { | |
case Failed(_, t) => HandleAsFailed(t) | |
} | |
/** | |
* soft-expires CachedFound and CachedNotFound based on a ttl. | |
* | |
* @param ttl | |
* values older than this will be considered expired, but still | |
* returned, and asynchronously refreshed in cache. | |
* @param expiry | |
* (optional) function to compute the expiry time | |
*/ | |
def softTtlExpiration[K, V]( | |
ttl: Duration, | |
expiry: Expiry = fixedExpiry | |
) = PartialHandler[K, V] { | |
case CachedFound(_, value, cachedAt) if expiry(cachedAt, ttl) < Time.now => | |
SoftExpiration(HandleAsFound(value)) | |
case CachedNotFound(_, cachedAt) if expiry(cachedAt, ttl) < Time.now => | |
SoftExpiration(HandleAsNotFound) | |
} | |
/** | |
* hard-expires CachedFound and CachedNotFound based on a ttl | |
* | |
* @param ttl | |
* values older than this will be considered a miss | |
* @param expiry | |
* (optional) function to compute the expiry time | |
*/ | |
def hardTtlExpiration[K, V]( | |
ttl: Duration, | |
expiry: Expiry = fixedExpiry | |
) = PartialHandler[K, V] { | |
case CachedFound(_, value, cachedAt) if expiry(cachedAt, ttl) < Time.now => | |
HandleAsMiss | |
case CachedNotFound(_, cachedAt) if expiry(cachedAt, ttl) < Time.now => | |
HandleAsMiss | |
} | |
/** | |
* hard-expires a CachedNotFound tombstone based on a ttl | |
* | |
* @param ttl | |
* values older than this will be considered expired | |
* @param expiry | |
* (optional) function to compute the expiry time | |
*/ | |
def notFoundHardTtlExpiration[K, V]( | |
ttl: Duration, | |
expiry: Expiry = fixedExpiry | |
) = PartialHandler[K, V] { | |
case CachedNotFound(_, cachedAt) => | |
if (expiry(cachedAt, ttl) < Time.now) | |
HandleAsMiss | |
else | |
HandleAsNotFound | |
} | |
/** | |
* hard-expires a CachedDeleted tombstone based on a ttl | |
* | |
* @param ttl | |
* values older than this will be considered expired | |
* @param expiry | |
* (optional) function to compute the expiry time | |
*/ | |
def deletedHardTtlExpiration[K, V]( | |
ttl: Duration, | |
expiry: Expiry = fixedExpiry | |
) = PartialHandler[K, V] { | |
case CachedDeleted(_, cachedAt) => | |
if (expiry(cachedAt, ttl) < Time.now) | |
HandleAsMiss | |
else | |
HandleAsNotFound | |
} | |
/** | |
* if shouldReadThrough evaluates to false, CachedResults are returned exclusively | |
* from cache, with no read through to the underlying KeyValueRepository. Otherwise, | |
* the readThroughHandler is used. | |
*/ | |
def gatedReadThrough[Q <: Seq[K], K, V]( | |
shouldReadThrough: Q => Boolean, | |
readThroughHandler: Handler[K, V] | |
): HandlerFactory[Q, K, V] = { (query) => | |
if (shouldReadThrough(query)) | |
readThroughHandler | |
else | |
cacheOnly[K, V] | |
} | |
/** | |
* read only from cache, never fall back to underlying KeyValueRepository | |
*/ | |
def cacheOnly[K, V]: Handler[K, V] = { | |
_ match { | |
case CachedFound(_, value, _) => HandleAsFound(value) | |
case _ => HandleAsNotFound | |
} | |
} | |
} | |
object CachingKeyValueRepository { | |
/** | |
* A function that takes a cachedAt time and ttl, and returns an expiry time. This function | |
* _must_ be deterministic with respect to the arguments provided, otherwise, you might get a | |
* MatchError when using this with softTtlExpiration. | |
*/ | |
type Expiry = (Time, Duration) => Time | |
/** | |
* An Expiry function with an epsilon of zero. | |
*/ | |
val fixedExpiry: Expiry = (cachedAt: Time, ttl: Duration) => cachedAt + ttl | |
/** | |
* A repeatable "random" expiry function that perturbs the ttl with a random value | |
* no greater than +/-(ttl * maxFactor). | |
*/ | |
def randomExpiry(maxFactor: Float): Expiry = { | |
if (maxFactor == 0) { | |
fixedExpiry | |
} else { | |
(cachedAt: Time, ttl: Duration) => { | |
val factor = (2 * new Random(cachedAt.inMilliseconds).nextFloat - 1) * maxFactor | |
cachedAt + ttl + (factor * ttl).toLong.milliseconds | |
} | |
} | |
} | |
/** | |
* A function the implements a read-repair operation. | |
*/ | |
type ReadRepairer[Q, V] = (Q, Seq[V]) => Future[Seq[Try[V]]] | |
/** | |
* Implements the ReadRepairer type, but just throws an exception. | |
*/ | |
val noRepair = (_: Any, values: Seq[Any]) => throw new UnsupportedOperationException | |
} | |
import CachingKeyValueRepository._ | |
/** | |
* Reads keyed values from a LockingCache, and reads through to an underlying | |
* KeyValueRepository for misses. supports a "soft ttl", beyond which values | |
* will be read through out-of-band to the originating request | |
* | |
* @param underlying | |
* the underlying KeyValueRepository | |
* @param cache | |
* the locking cache to read from | |
* @newQuery | |
* a function for converting a subset of the keys of the original query into a new | |
* query. this is used to construct the query passed to the underlying repository | |
* to fetch the cache misses. | |
* @param cachedResultHandler | |
* a function that specify policies about how to handle results from cache. you defined | |
* how to handle failures (as misses or failures), | |
* @param picker | |
* used to choose between the value in cache and the value read from the DB when | |
* storing values in the cache | |
* @param observer | |
* a CacheObserver for collecting cache statistics | |
* @param readRepairer | |
* a function that can repair cached items. this is only meaningful if the cachedResultHandler | |
* can potentially return a value of ReadRepair. | |
*/ | |
class CachingKeyValueRepository[Q <: Seq[K], K, V]( | |
underlying: KeyValueRepository[Q, K, V], | |
cache: LockingCache[K, Cached[V]], | |
newQuery: SubqueryBuilder[Q, K], | |
handlerFactory: CachedResult.HandlerFactory[Q, K, V] = | |
CachedResult.defaultHandlerFactory[Q, K, V], | |
picker: LockingCache.Picker[Cached[V]] = new PreferNewestCached[V], | |
observer: CacheObserver = NullCacheObserver, | |
readRepairer: ReadRepairer[Q, V] = noRepair) | |
extends KeyValueRepository[Q, K, V] | |
{ | |
@deprecated("Use HandlerFactory instead of PartialFunction") | |
def this( | |
underlying: KeyValueRepository[Q, K, V], | |
cache: LockingCache[K, Cached[V]], | |
newQuery: SubqueryBuilder[Q, K], | |
partialHandler: PartialFunction[CachedResult[K, V], CachedResultAction[V]], | |
picker: LockingCache.Picker[Cached[V]] = new PreferNewestCached[V], | |
observer: CacheObserver = NullCacheObserver, | |
readRepairer: ReadRepairer[Q, V] = noRepair | |
) = this( | |
underlying, | |
cache, | |
newQuery, | |
_ => CachedResult.PartialHandler.terminate(partialHandler), | |
picker, | |
observer, | |
readRepairer | |
) | |
import CachedResult._ | |
import CachedResultAction._ | |
protected[this] val log = Logger.get(getClass.getSimpleName) | |
protected[this] val effectiveCacheStats = observer.scope("effective") | |
protected case class ProcessedCacheResult( | |
hits: Map[K, V], | |
misses: Seq[K], | |
failures: Map[K, Throwable], | |
tombstones: Set[K], | |
softExpirations: Seq[K], | |
repairs: Seq[(K, V)]) | |
override def apply(keys: Q): Future[KeyValueResult[K, V]] = { | |
getFromCache(keys) flatMap { cacheResult => | |
val ProcessedCacheResult(hits, misses, failures, tombstones, softExpirations, repairs) = | |
process(keys, cacheResult) | |
val (repairKeys, repairItems) = repairs.unzip | |
recordCacheStats(keys, misses.toSet, softExpirations.toSet) | |
// now read through all notFound | |
val futureFromUnderlying = readThrough(newQuery(misses, keys)) | |
val futureFromRepair = readRepair(newQuery(repairKeys, keys), repairItems) | |
// async read-through for the expired results, ignore results | |
readThrough(newQuery(softExpirations, keys)) | |
// merge all results together | |
for { | |
fromUnderlying <- futureFromUnderlying | |
fromRepair <- futureFromRepair | |
fromCache = KeyValueResult(hits, tombstones, failures) | |
} yield KeyValueResult.sum(Seq(fromCache, fromUnderlying, fromRepair)) | |
} | |
} | |
protected[this] def getFromCache(keys: Seq[K]): Future[KeyValueResult[K, Cached[V]]] = { | |
val uniqueKeys = keys.distinct | |
cache.get(uniqueKeys) handle { case t => | |
log.error(t, "exception caught in cache get") | |
// treat total cache failure as a fetch that returned all failures | |
KeyValueResult(failed = uniqueKeys map { _ -> t } toMap) | |
} | |
} | |
/** | |
* Buckets cache results according to the wishes of the CachedResultHandler | |
*/ | |
protected[this] def process( | |
keys: Q, | |
cacheResult: KeyValueResult[K, Cached[V]] | |
): ProcessedCacheResult = { | |
val cachedResultHandler = handlerFactory(keys) | |
val hits = new mutable.HashMap[K, V] | |
val misses = new mutable.ListBuffer[K] | |
val failures = new mutable.HashMap[K, Throwable] | |
val tombstones = new mutable.ListBuffer[K] | |
val softExpiredKeys = new mutable.ListBuffer[K] | |
val repairs = new mutable.ListBuffer[(K, V)] | |
for (key <- keys) { | |
val cachedResult = cacheResult(key) match { | |
case Throw(t) => Failed(key, t) | |
case Return(None) => NotFound(key) | |
case Return(Some(cached)) => cached.status match { | |
case CachedValueStatus.Found => cached.value match { | |
case None => NotFound(key) | |
case Some(value) => CachedFound(key, value, cached.cachedAt) | |
} | |
case CachedValueStatus.NotFound => CachedNotFound(key, cached.cachedAt) | |
case CachedValueStatus.Deleted => CachedDeleted(key, cached.cachedAt) | |
case CachedValueStatus.SerializationFailed => SerializationFailed(key) | |
case CachedValueStatus.DeserializationFailed => DeserializationFailed(key) | |
case CachedValueStatus.Evicted => NotFound(key) | |
} | |
} | |
def processAction(action: CachedResultAction[V]) { | |
action match { | |
case HandleAsMiss => misses += key | |
case HandleAsFound(value) => hits(key) = value | |
case HandleAsNotFound => tombstones += key | |
case HandleAsFailed(t) => failures(key) = t | |
case ReadRepair(value) => repairs += (key -> value) | |
case SoftExpiration(subaction) => | |
softExpiredKeys += key | |
processAction(subaction) | |
} | |
} | |
processAction(cachedResultHandler(cachedResult)) | |
} | |
ProcessedCacheResult( | |
hits.toMap, | |
misses, | |
failures.toMap, | |
tombstones.toSet, | |
softExpiredKeys, | |
repairs) | |
} | |
protected[this] def recordCacheStats(keys: Seq[K], notFound: Set[K], expired: Set[K]) { | |
keys foreach { key => | |
if (notFound.contains(key) || expired.contains(key)) | |
effectiveCacheStats.miss(key.toString) | |
else | |
effectiveCacheStats.hit(key.toString) | |
if (notFound.contains(key)) | |
observer.miss(key.toString) | |
else | |
observer.hit(key.toString) | |
} | |
} | |
/** | |
* read through to the underlying repository | |
* | |
* @param keys | |
* the keys to read | |
*/ | |
def readThrough(keys: Q): Future[KeyValueResult[K, V]] = { | |
if (keys.isEmpty) { | |
KeyValueResult.emptyFuture | |
} else { | |
underlying(keys) onSuccess { result => | |
writeToCache(keys, result) | |
} | |
} | |
} | |
/** | |
* Read-repairs the specified items, writing the updated results back to cache. | |
*/ | |
def readRepair(keys: Q, items: Seq[V]): Future[KeyValueResult[K, V]] = { | |
if (keys.isEmpty) { | |
KeyValueResult.emptyFuture | |
} else { | |
KeyValueResult.fromSeqTry(keys) { | |
readRepairer(keys, items) | |
} onSuccess { result => | |
writeToCache(keys, result) | |
} | |
} | |
} | |
/** | |
* Writes the contents of the given KeyValueResult to cache. | |
*/ | |
def writeToCache(keys: Q, result: KeyValueResult[K, V]) { | |
lazy val cachedEmpty = { | |
val now = Time.now | |
Cached[V](None, CachedValueStatus.NotFound, now, Some(now)) | |
} | |
keys foreach { key => | |
// only cache Returns from the underlying repo, skip Throws | |
(result(key) match { | |
case Return(Some(value)) => Some(cachedEmpty.copy( | |
value = Some(value), | |
status = CachedValueStatus.Found)) | |
case Return(None) => Some(cachedEmpty) | |
case Throw(_) => None | |
}) foreach { cached => | |
cache.lockAndSet(key, LockingCache.PickingHandler(cached, picker)) onFailure { case t => | |
log.error(t, "exception caught in lockAndSet") | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I'm into this.