-
-
Save mhseiden/bbe66815c845e0c770c7 to your computer and use it in GitHub Desktop.
Platfora Promise Cache Gist - Max Seiden
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// to use these classes in the Scala REPL (2.11), use the `scala -i { gist-file.scala }` option | |
import java.util.concurrent.locks.ReentrantReadWriteLock | |
import java.io.{FileWriter, File} | |
import java.util.concurrent.{ThreadFactory, Executors} | |
import scala.concurrent.{Await, Future, ExecutionContext, Promise} | |
import scala.collection.mutable | |
import scala.concurrent.duration._ | |
import scala.io.Source | |
import java.util.concurrent.atomic.AtomicInteger | |
import scala.util.Try | |
/** | |
* A trait that defines a cache which loads entries asynchronously, and uses Future[V] | |
* for (externally facing) synchronization. This enables concurrent consumers to | |
* register callbacks that will execute when a resource has finished loading (or | |
* immediately if the resource has already been loaded). Internally, this is | |
* implemented with a mapping from K => Promise[V]. This further decouples the entry | |
* management (ex: eviction policy) from the entry loading and eviction (ex: entry manager). | |
* Since the cache itself only needs to return the Future[V] associated with an entry | |
* Promise[V], the actual state of the entry load is generally not a concern of the | |
* cache entry management. | |
* | |
* Note that this does not guarantees of internal thread-safety; | |
* it is up to implementations to synchronize access to internal data structures. | |
* | |
* @tparam K the type of the keys that map to the Promise[V] entries in the cache map | |
* @tparam V the type of the entries in the cache map | |
*/ | |
trait AsyncCache[K,V] { | |
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= | |
// Public cache-access methods | |
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= | |
def get(key: K): Option[Future[V]] | |
def load(key: K): Future[V] | |
def evict(key: K) | |
def clear() | |
def size(): Int | |
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= | |
// Ordered hash map for managing entries | |
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= | |
protected val mapping = new mutable.LinkedHashMap[K,Promise[V]]() | |
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= | |
// Definitions for the EntryManager and EvictionPolicy instances | |
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= | |
protected val manager: EntryManager | |
protected val policy: EvictionPolicy | |
protected val limit: Option[Int] | |
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= | |
// An eviction policy for cache entries | |
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= | |
protected trait EvictionPolicy { | |
def refresh(key: K) | |
def hasSpace: Boolean | |
def nextToEvict: Option[K] | |
} | |
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= | |
// An entry manager for loading and evicting entries | |
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= | |
protected trait EntryManager { | |
def load(key: K): Promise[V] | |
def evict(key: K) | |
} | |
} | |
/** | |
* A non-thread-safe Least Recently Used eviction policy that considers the head of | |
* the LinkedHashMap to be the next candidate for eviction. The head is used instead | |
* of the tail, since a put into the LinkedHashMap appends entries. Thus it follows | |
* that the entry refresh logic removes the entry from its current position, and | |
* appends it to the end of the list. | |
* | |
* @tparam K the type of the keys for the cache map | |
* @tparam V the type of the values for the cacke map | |
*/ | |
trait LruEvictionPolicy[K,V] extends AsyncCache[K,V] { | |
protected val policy = new EvictionPolicy { | |
def refresh(key: K) = { | |
mapping.remove(key).foreach(mapping.put(key,_)) | |
} | |
def nextToEvict: Option[K] = { | |
if(hasSpace) None else mapping.headOption.map(_._1) | |
} | |
def hasSpace: Boolean = { | |
limit.forall(_ > mapping.size) | |
} | |
} | |
} | |
/** | |
* A non-thread-safe default cache implementation, that uses a simple, default | |
* strategy for managing cache entries. Note that entries which encounter exception | |
* while loading will not be evicted automatically; it is up to the caller or an overriding | |
* implementation perform the eviction. | |
* | |
* @tparam K the type of the keys that map to the Promise[V] entries in the cache map | |
* @tparam V the type of the entries in the cache map | |
*/ | |
trait DefaultCache[K,V] extends AsyncCache[K,V] { | |
def get(key: K): Option[Future[V]] = { | |
mapping.get(key).map(_.future) | |
} | |
def load(key: K): Future[V] = { | |
mapping.get(key) match { | |
case Some(prior) => | |
policy.refresh(key) | |
prior.future | |
case None => | |
if(!policy.hasSpace) { | |
policy.nextToEvict.foreach(evict) | |
} | |
val entry = manager.load(key) | |
mapping.put(key, entry) | |
entry.future | |
} | |
} | |
def evict(key: K): Unit = { | |
mapping.remove(key) match { | |
case Some(entry) => | |
entry.tryFailure(new RuntimeException(s"Entry $key was evicted from the cache: $this!")) | |
manager.evict(key) | |
case _ => | |
// noop | |
} | |
} | |
def clear(): Unit = { | |
mapping.keys.foreach(evict) | |
mapping.clear() | |
} | |
def size(): Int = mapping.size | |
} | |
/** | |
* A helper that uses syntactic sugar to create a simple, thread-safe wrapper for | |
* an AsyncCache instance. Note that is may not provide optimal performance in | |
* write-heavy workloads; it's primary goal is to provide generic thread-safety | |
* for all AsyncCache implementations. Finer granularity synchronization is left | |
* as an exercise for custom AsyncCache implementations. | |
* | |
* usage: | |
* | |
* import ThreadSafeCache._ | |
* val cache = new MyAsyncCache[K,V]().threadsafe | |
* | |
*/ | |
object ThreadSafeCache { | |
final implicit class ThreadsafeCachePimp[K,V](cache: AsyncCache[K,V]) { | |
def threadsafe: AsyncCache[K,V] = cache match { | |
case cache: ThreadSafeCache[K,V] => cache | |
case _ => new ThreadSafeCache[K,V](cache) | |
} | |
} | |
/** | |
* A thread-safe AsyncCache wrapper for an underlying cache implementation. | |
* A ReadWrite Lock is used instead of this.synchronized(...) to reduce contention | |
* for read-heavy workloads. | |
* | |
* @param underlying a non-thread-safe AsyncCache instance | |
* @tparam K the type of the keys that map to the Promise[V] entries in the cache map | |
* @tparam V the type of the entries in the cache map | |
*/ | |
private final class ThreadSafeCache[K,V](underlying: AsyncCache[K,V]) extends AsyncCache[K,V] { | |
override def get(key: K): Option[Future[V]] = read(underlying.get(key)) | |
override def load(key: K): Future[V] = write(underlying.load(key)) | |
override def evict(key: K) = write(underlying.evict(key)) | |
override def clear() = write(underlying.clear()) | |
override def size(): Int = read(underlying.size()) | |
private val rwLock = new ReentrantReadWriteLock(true) | |
private val rLock = rwLock.readLock() | |
private val wLock = rwLock.writeLock() | |
private def write[T](f: => T): T = { | |
try { wLock.lock(); f } | |
finally wLock.unlock() | |
} | |
private def read[T](f: => T): T = { | |
try { rLock.lock(); f } | |
finally rLock.unlock() | |
} | |
// todo: there's probably a cleaner way to handle this | |
protected val manager = null | |
protected val policy = null | |
protected val limit = null | |
} | |
} | |
object Examples { | |
import ThreadSafeCache._ | |
// a trait which defines a cache that maps files to arrays of strings (ex: lines from the files) | |
trait FileContentCache | |
extends AsyncCache[File,Array[String]] | |
with DefaultCache[File,Array[String]] | |
with LruEvictionPolicy[File,Array[String]] | |
// a FileContentCache that loads files in the same thread as the caller | |
final class SerialFileContentCache(val limit: Option[Int]) extends FileContentCache { | |
protected val manager = new EntryManager { | |
def evict(key: File) { | |
/* noop */ | |
} | |
def load(key: File): Promise[Array[String]] = { | |
Promise.fromTry { | |
Try { | |
Source.fromFile(key).getLines().toArray | |
} | |
} | |
} | |
} | |
} | |
// a FileContentCache that uses a thread pool to load files asynchronously | |
final class ThreadPoolFileContentCache(val limit: Option[Int]) extends FileContentCache { | |
private val pool = Executors.newFixedThreadPool(limit.getOrElse(8)) | |
private implicit val ctx = ExecutionContext.fromExecutor(pool) | |
protected val manager = new EntryManager { | |
def evict(key: File) { | |
/* noop */ | |
} | |
def load(key: File): Promise[Array[String]] = { | |
mkPromise { Source.fromFile(key).getLines().toArray } | |
} | |
private def mkPromise(f: => Array[String]): Promise[Array[String]] = { | |
Promise[Array[String]]().completeWith(Future(f)) | |
} | |
} | |
} | |
def main(argv: Array[String]) { | |
(0 until 10) foreach mkFile | |
val limit = Some(2) | |
runFileLoadExample(new SerialFileContentCache(limit)) | |
runFileLoadExample(new ThreadPoolFileContentCache(limit)) | |
Thread.sleep((1 second).toMillis) // wait for stdout to flush to the console before exiting | |
System.exit(0) | |
} | |
// a helper to run the file-load example using a given FileContentCache instance | |
private def runFileLoadExample(cache: FileContentCache) { | |
import ExecutionContext.Implicits._ // use the global execution context for the result callback | |
implicit def toFile(i: Int): File = { new File("/tmp", s"file-$i.txt") } | |
println(s"\nRunning file load example with cache: $cache") | |
// using a threadsafe cache, for the sake of giving an example | |
val tsCache = cache.threadsafe | |
tsCache.load(1) // load the 1st entry | |
tsCache.load(2) // load the 2nd entry | |
tsCache.load(3) // evict the 1st entry, load the 3rd entry | |
tsCache.load(2) // touch the 2nd entry | |
tsCache.load(4) // evict the 3rd entry, load the 4th entry | |
// asynchronously get the lines from the cache and compare them to the actual lines in the file | |
val result = cache.get(2).get map { | |
case cacheLines: Array[String] => | |
val filesLines = Source.fromFile(2).getLines().toArray | |
for((l1,l2) <- cacheLines.zip(filesLines)) { | |
println(s"\t[${Thread.currentThread().getName}] $l1 =?= $l2") | |
} | |
} | |
Await.ready(result, 1 second) | |
} | |
// a helper to make a named file with 10 lines | |
private def mkFile(i: Int) { | |
val f = new File("/tmp", s"file-$i.txt") | |
f.createNewFile() | |
val writer = new FileWriter(f) | |
try { | |
for(j <- 0 until 10) { | |
writer.write(s"File $i : Line $j\n") | |
} | |
} finally { | |
writer.close() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment