Skip to content

Instantly share code, notes, and snippets.

@9len
Created May 25, 2012 16:49
Show Gist options
  • Save 9len/2789151 to your computer and use it in GitHub Desktop.
Save 9len/2789151 to your computer and use it in GitHub Desktop.
package com.twitter.servo.repository
import com.twitter.conversions.time._
import com.twitter.servo.cache._
import com.twitter.logging.Logger
import com.twitter.util.{Duration, Future, Return, Time, Throw, Try}
import scala.collection.mutable
import scala.util.Random
/**
* A set of classes representing the various states for a cached result.
*/
sealed abstract class CachedResult[+K, +V] {
def key: K
}
object CachedResult {
import CachedResultAction._
import CachingKeyValueRepository._
/** Indicates the key was not in cache */
case class NotFound[K](key: K) extends CachedResult[K, Nothing]
/** Indicates there was an error fetching the key */
case class Failed[K](key: K, t: Throwable) extends CachedResult[K, Nothing]
/** Indicates the cached value could not be deserialized */
case class DeserializationFailed[K](key: K) extends CachedResult[K, Nothing]
/** Indicates the cached value could not be serialized */
case class SerializationFailed[K](key: K) extends CachedResult[K, Nothing]
/** Indicates that a NotFound tombstone was found in cached */
case class CachedNotFound[K](key: K, cachedAt: Time) extends CachedResult[K, Nothing]
/** Indicates that a Deleted tombstone was found in cached */
case class CachedDeleted[K](key: K, cachedAt: Time) extends CachedResult[K, Nothing]
/** Indicates that value was found in cached */
case class CachedFound[K, V](key: K, value: V, cachedAt: Time) extends CachedResult[K, V]
type Handler[K, V] = CachedResult[K, V] => CachedResultAction[V]
type PartialHandler[K, V] = CachedResult[K, V] => Option[CachedResultAction[V]]
type HandlerFactory[Q <: Seq[K], K, V] = Q => Handler[K, V]
val PartialHandler = new {
/**
* Sugar to produce a PartialHandler from a PartialFunction. Successive calls to
* isDefined MUST return the same result. Otherwise, take the syntax hit and wire
* up your own PartialHandler.
*/
def apply[K, V](
partial: PartialFunction[CachedResult[K, V], CachedResultAction[V]]
): PartialHandler[K, V] = partial.lift(_)
/**
* compose one PartialHandler with another to produce a new PartialHandler
*/
def compose[K, V](
thisHandler: PartialHandler[K, V],
thatHandler: PartialHandler[K, V]
): PartialHandler[K, V] = { (cachedResult) =>
thisHandler(cachedResult) orElse thatHandler(cachedResult)
}
/**
* terminate a PartialHandler to produce a new Handler
*/
def terminate[K, V](
partial: PartialHandler[K, V],
handler: Handler[K, V] = defaultHandler[K, V]
): Handler[K, V] = { (cachedResult) =>
partial(cachedResult).getOrElse(handler(cachedResult))
}
/**
* lift a PartialFunction to a PartialHandler and terminate
*/
def terminate[K, V](
partial: PartialFunction[CachedResult[K, V], CachedResultAction[V]],
handler: Handler[K, V] = defaultHandler[K, V]
): Handler[K, V] = terminate(apply(partial), handler)
}
def defaultHandlerFactory[Q <: Seq[K], K, V]: HandlerFactory[Q, K, V] = _ => defaultHandler[K, V]
/**
* This is the default Handler. Failures are treated as misses.
*/
def defaultHandler[K, V]: Handler[K, V] = { (cachedResult) =>
cachedResult match {
case NotFound(_) | Failed(_, _) => HandleAsMiss
case DeserializationFailed(_) | SerializationFailed(_) => HandleAsMiss
case CachedNotFound(_, _) | CachedDeleted(_, _) => HandleAsNotFound
case CachedFound(_, value, _) => HandleAsFound(value)
}
}
/**
* A ParitalHandler that bubbles memcache failures up instead of converting
* those failures to misses.
*/
def failuresAreFailures[K, V] = PartialHandler[K, V] {
case Failed(_, t) => HandleAsFailed(t)
}
/**
* soft-expires CachedFound and CachedNotFound based on a ttl.
*
* @param ttl
* values older than this will be considered expired, but still
* returned, and asynchronously refreshed in cache.
* @param expiry
* (optional) function to compute the expiry time
*/
def softTtlExpiration[K, V](
ttl: Duration,
expiry: Expiry = fixedExpiry
) = PartialHandler[K, V] {
case CachedFound(_, value, cachedAt) if expiry(cachedAt, ttl) < Time.now =>
SoftExpiration(HandleAsFound(value))
case CachedNotFound(_, cachedAt) if expiry(cachedAt, ttl) < Time.now =>
SoftExpiration(HandleAsNotFound)
}
/**
* hard-expires CachedFound and CachedNotFound based on a ttl
*
* @param ttl
* values older than this will be considered a miss
* @param expiry
* (optional) function to compute the expiry time
*/
def hardTtlExpiration[K, V](
ttl: Duration,
expiry: Expiry = fixedExpiry
) = PartialHandler[K, V] {
case CachedFound(_, value, cachedAt) if expiry(cachedAt, ttl) < Time.now =>
HandleAsMiss
case CachedNotFound(_, cachedAt) if expiry(cachedAt, ttl) < Time.now =>
HandleAsMiss
}
/**
* hard-expires a CachedNotFound tombstone based on a ttl
*
* @param ttl
* values older than this will be considered expired
* @param expiry
* (optional) function to compute the expiry time
*/
def notFoundHardTtlExpiration[K, V](
ttl: Duration,
expiry: Expiry = fixedExpiry
) = PartialHandler[K, V] {
case CachedNotFound(_, cachedAt) =>
if (expiry(cachedAt, ttl) < Time.now)
HandleAsMiss
else
HandleAsNotFound
}
/**
* hard-expires a CachedDeleted tombstone based on a ttl
*
* @param ttl
* values older than this will be considered expired
* @param expiry
* (optional) function to compute the expiry time
*/
def deletedHardTtlExpiration[K, V](
ttl: Duration,
expiry: Expiry = fixedExpiry
) = PartialHandler[K, V] {
case CachedDeleted(_, cachedAt) =>
if (expiry(cachedAt, ttl) < Time.now)
HandleAsMiss
else
HandleAsNotFound
}
/**
* if shouldReadThrough evaluates to false, CachedResults are returned exclusively
* from cache, with no read through to the underlying KeyValueRepository. Otherwise,
* the readThroughHandler is used.
*/
def gatedReadThrough[Q <: Seq[K], K, V](
shouldReadThrough: Q => Boolean,
readThroughHandler: Handler[K, V]
): HandlerFactory[Q, K, V] = { (query) =>
if (shouldReadThrough(query))
readThroughHandler
else
cacheOnly[K, V]
}
/**
* read only from cache, never fall back to underlying KeyValueRepository
*/
def cacheOnly[K, V]: Handler[K, V] = {
_ match {
case CachedFound(_, value, _) => HandleAsFound(value)
case _ => HandleAsNotFound
}
}
}
object CachingKeyValueRepository {
/**
* A function that takes a cachedAt time and ttl, and returns an expiry time. This function
* _must_ be deterministic with respect to the arguments provided, otherwise, you might get a
* MatchError when using this with softTtlExpiration.
*/
type Expiry = (Time, Duration) => Time
/**
* An Expiry function with an epsilon of zero.
*/
val fixedExpiry: Expiry = (cachedAt: Time, ttl: Duration) => cachedAt + ttl
/**
* A repeatable "random" expiry function that perturbs the ttl with a random value
* no greater than +/-(ttl * maxFactor).
*/
def randomExpiry(maxFactor: Float): Expiry = {
if (maxFactor == 0) {
fixedExpiry
} else {
(cachedAt: Time, ttl: Duration) => {
val factor = (2 * new Random(cachedAt.inMilliseconds).nextFloat - 1) * maxFactor
cachedAt + ttl + (factor * ttl).toLong.milliseconds
}
}
}
/**
* A function the implements a read-repair operation.
*/
type ReadRepairer[Q, V] = (Q, Seq[V]) => Future[Seq[Try[V]]]
/**
* Implements the ReadRepairer type, but just throws an exception.
*/
val noRepair = (_: Any, values: Seq[Any]) => throw new UnsupportedOperationException
}
import CachingKeyValueRepository._
/**
* Reads keyed values from a LockingCache, and reads through to an underlying
* KeyValueRepository for misses. supports a "soft ttl", beyond which values
* will be read through out-of-band to the originating request
*
* @param underlying
* the underlying KeyValueRepository
* @param cache
* the locking cache to read from
* @newQuery
* a function for converting a subset of the keys of the original query into a new
* query. this is used to construct the query passed to the underlying repository
* to fetch the cache misses.
* @param cachedResultHandler
* a function that specify policies about how to handle results from cache. you defined
* how to handle failures (as misses or failures),
* @param picker
* used to choose between the value in cache and the value read from the DB when
* storing values in the cache
* @param observer
* a CacheObserver for collecting cache statistics
* @param readRepairer
* a function that can repair cached items. this is only meaningful if the cachedResultHandler
* can potentially return a value of ReadRepair.
*/
class CachingKeyValueRepository[Q <: Seq[K], K, V](
underlying: KeyValueRepository[Q, K, V],
cache: LockingCache[K, Cached[V]],
newQuery: SubqueryBuilder[Q, K],
handlerFactory: CachedResult.HandlerFactory[Q, K, V] =
CachedResult.defaultHandlerFactory[Q, K, V],
picker: LockingCache.Picker[Cached[V]] = new PreferNewestCached[V],
observer: CacheObserver = NullCacheObserver,
readRepairer: ReadRepairer[Q, V] = noRepair)
extends KeyValueRepository[Q, K, V]
{
@deprecated("Use HandlerFactory instead of PartialFunction")
def this(
underlying: KeyValueRepository[Q, K, V],
cache: LockingCache[K, Cached[V]],
newQuery: SubqueryBuilder[Q, K],
partialHandler: PartialFunction[CachedResult[K, V], CachedResultAction[V]],
picker: LockingCache.Picker[Cached[V]] = new PreferNewestCached[V],
observer: CacheObserver = NullCacheObserver,
readRepairer: ReadRepairer[Q, V] = noRepair
) = this(
underlying,
cache,
newQuery,
_ => CachedResult.PartialHandler.terminate(partialHandler),
picker,
observer,
readRepairer
)
import CachedResult._
import CachedResultAction._
protected[this] val log = Logger.get(getClass.getSimpleName)
protected[this] val effectiveCacheStats = observer.scope("effective")
protected case class ProcessedCacheResult(
hits: Map[K, V],
misses: Seq[K],
failures: Map[K, Throwable],
tombstones: Set[K],
softExpirations: Seq[K],
repairs: Seq[(K, V)])
override def apply(keys: Q): Future[KeyValueResult[K, V]] = {
getFromCache(keys) flatMap { cacheResult =>
val ProcessedCacheResult(hits, misses, failures, tombstones, softExpirations, repairs) =
process(keys, cacheResult)
val (repairKeys, repairItems) = repairs.unzip
recordCacheStats(keys, misses.toSet, softExpirations.toSet)
// now read through all notFound
val futureFromUnderlying = readThrough(newQuery(misses, keys))
val futureFromRepair = readRepair(newQuery(repairKeys, keys), repairItems)
// async read-through for the expired results, ignore results
readThrough(newQuery(softExpirations, keys))
// merge all results together
for {
fromUnderlying <- futureFromUnderlying
fromRepair <- futureFromRepair
fromCache = KeyValueResult(hits, tombstones, failures)
} yield KeyValueResult.sum(Seq(fromCache, fromUnderlying, fromRepair))
}
}
protected[this] def getFromCache(keys: Seq[K]): Future[KeyValueResult[K, Cached[V]]] = {
val uniqueKeys = keys.distinct
cache.get(uniqueKeys) handle { case t =>
log.error(t, "exception caught in cache get")
// treat total cache failure as a fetch that returned all failures
KeyValueResult(failed = uniqueKeys map { _ -> t } toMap)
}
}
/**
* Buckets cache results according to the wishes of the CachedResultHandler
*/
protected[this] def process(
keys: Q,
cacheResult: KeyValueResult[K, Cached[V]]
): ProcessedCacheResult = {
val cachedResultHandler = handlerFactory(keys)
val hits = new mutable.HashMap[K, V]
val misses = new mutable.ListBuffer[K]
val failures = new mutable.HashMap[K, Throwable]
val tombstones = new mutable.ListBuffer[K]
val softExpiredKeys = new mutable.ListBuffer[K]
val repairs = new mutable.ListBuffer[(K, V)]
for (key <- keys) {
val cachedResult = cacheResult(key) match {
case Throw(t) => Failed(key, t)
case Return(None) => NotFound(key)
case Return(Some(cached)) => cached.status match {
case CachedValueStatus.Found => cached.value match {
case None => NotFound(key)
case Some(value) => CachedFound(key, value, cached.cachedAt)
}
case CachedValueStatus.NotFound => CachedNotFound(key, cached.cachedAt)
case CachedValueStatus.Deleted => CachedDeleted(key, cached.cachedAt)
case CachedValueStatus.SerializationFailed => SerializationFailed(key)
case CachedValueStatus.DeserializationFailed => DeserializationFailed(key)
case CachedValueStatus.Evicted => NotFound(key)
}
}
def processAction(action: CachedResultAction[V]) {
action match {
case HandleAsMiss => misses += key
case HandleAsFound(value) => hits(key) = value
case HandleAsNotFound => tombstones += key
case HandleAsFailed(t) => failures(key) = t
case ReadRepair(value) => repairs += (key -> value)
case SoftExpiration(subaction) =>
softExpiredKeys += key
processAction(subaction)
}
}
processAction(cachedResultHandler(cachedResult))
}
ProcessedCacheResult(
hits.toMap,
misses,
failures.toMap,
tombstones.toSet,
softExpiredKeys,
repairs)
}
protected[this] def recordCacheStats(keys: Seq[K], notFound: Set[K], expired: Set[K]) {
keys foreach { key =>
if (notFound.contains(key) || expired.contains(key))
effectiveCacheStats.miss(key.toString)
else
effectiveCacheStats.hit(key.toString)
if (notFound.contains(key))
observer.miss(key.toString)
else
observer.hit(key.toString)
}
}
/**
* read through to the underlying repository
*
* @param keys
* the keys to read
*/
def readThrough(keys: Q): Future[KeyValueResult[K, V]] = {
if (keys.isEmpty) {
KeyValueResult.emptyFuture
} else {
underlying(keys) onSuccess { result =>
writeToCache(keys, result)
}
}
}
/**
* Read-repairs the specified items, writing the updated results back to cache.
*/
def readRepair(keys: Q, items: Seq[V]): Future[KeyValueResult[K, V]] = {
if (keys.isEmpty) {
KeyValueResult.emptyFuture
} else {
KeyValueResult.fromSeqTry(keys) {
readRepairer(keys, items)
} onSuccess { result =>
writeToCache(keys, result)
}
}
}
/**
* Writes the contents of the given KeyValueResult to cache.
*/
def writeToCache(keys: Q, result: KeyValueResult[K, V]) {
lazy val cachedEmpty = {
val now = Time.now
Cached[V](None, CachedValueStatus.NotFound, now, Some(now))
}
keys foreach { key =>
// only cache Returns from the underlying repo, skip Throws
(result(key) match {
case Return(Some(value)) => Some(cachedEmpty.copy(
value = Some(value),
status = CachedValueStatus.Found))
case Return(None) => Some(cachedEmpty)
case Throw(_) => None
}) foreach { cached =>
cache.lockAndSet(key, LockingCache.PickingHandler(cached, picker)) onFailure { case t =>
log.error(t, "exception caught in lockAndSet")
}
}
}
}
}
@evnm
Copy link

evnm commented May 29, 2012

I'm into this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment