Skip to content

Instantly share code, notes, and snippets.

@mhseiden
Last active August 29, 2015 14:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mhseiden/bbe66815c845e0c770c7 to your computer and use it in GitHub Desktop.
Save mhseiden/bbe66815c845e0c770c7 to your computer and use it in GitHub Desktop.
Platfora Promise Cache Gist - Max Seiden
// to use these classes in the Scala REPL (2.11), use the `scala -i { gist-file.scala }` option
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.io.{FileWriter, File}
import java.util.concurrent.{ThreadFactory, Executors}
import scala.concurrent.{Await, Future, ExecutionContext, Promise}
import scala.collection.mutable
import scala.concurrent.duration._
import scala.io.Source
import java.util.concurrent.atomic.AtomicInteger
import scala.util.Try
/**
* A trait that defines a cache which loads entries asynchronously, and uses Future[V]
* for (externally facing) synchronization. This enables concurrent consumers to
* register callbacks that will execute when a resource has finished loading (or
* immediately if the resource has already been loaded). Internally, this is
* implemented with a mapping from K => Promise[V]. This further decouples the entry
* management (ex: eviction policy) from the entry loading and eviction (ex: entry manager).
* Since the cache itself only needs to return the Future[V] associated with an entry
* Promise[V], the actual state of the entry load is generally not a concern of the
* cache entry management.
*
* Note that this does not guarantees of internal thread-safety;
* it is up to implementations to synchronize access to internal data structures.
*
* @tparam K the type of the keys that map to the Promise[V] entries in the cache map
* @tparam V the type of the entries in the cache map
*/
trait AsyncCache[K,V] {
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
// Public cache-access methods
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
def get(key: K): Option[Future[V]]
def load(key: K): Future[V]
def evict(key: K)
def clear()
def size(): Int
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
// Ordered hash map for managing entries
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
protected val mapping = new mutable.LinkedHashMap[K,Promise[V]]()
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
// Definitions for the EntryManager and EvictionPolicy instances
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
protected val manager: EntryManager
protected val policy: EvictionPolicy
protected val limit: Option[Int]
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
// An eviction policy for cache entries
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
protected trait EvictionPolicy {
def refresh(key: K)
def hasSpace: Boolean
def nextToEvict: Option[K]
}
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
// An entry manager for loading and evicting entries
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
protected trait EntryManager {
def load(key: K): Promise[V]
def evict(key: K)
}
}
/**
* A non-thread-safe Least Recently Used eviction policy that considers the head of
* the LinkedHashMap to be the next candidate for eviction. The head is used instead
* of the tail, since a put into the LinkedHashMap appends entries. Thus it follows
* that the entry refresh logic removes the entry from its current position, and
* appends it to the end of the list.
*
* @tparam K the type of the keys for the cache map
* @tparam V the type of the values for the cacke map
*/
trait LruEvictionPolicy[K,V] extends AsyncCache[K,V] {
protected val policy = new EvictionPolicy {
def refresh(key: K) = {
mapping.remove(key).foreach(mapping.put(key,_))
}
def nextToEvict: Option[K] = {
if(hasSpace) None else mapping.headOption.map(_._1)
}
def hasSpace: Boolean = {
limit.forall(_ > mapping.size)
}
}
}
/**
* A non-thread-safe default cache implementation, that uses a simple, default
* strategy for managing cache entries. Note that entries which encounter exception
* while loading will not be evicted automatically; it is up to the caller or an overriding
* implementation perform the eviction.
*
* @tparam K the type of the keys that map to the Promise[V] entries in the cache map
* @tparam V the type of the entries in the cache map
*/
trait DefaultCache[K,V] extends AsyncCache[K,V] {
def get(key: K): Option[Future[V]] = {
mapping.get(key).map(_.future)
}
def load(key: K): Future[V] = {
mapping.get(key) match {
case Some(prior) =>
policy.refresh(key)
prior.future
case None =>
if(!policy.hasSpace) {
policy.nextToEvict.foreach(evict)
}
val entry = manager.load(key)
mapping.put(key, entry)
entry.future
}
}
def evict(key: K): Unit = {
mapping.remove(key) match {
case Some(entry) =>
entry.tryFailure(new RuntimeException(s"Entry $key was evicted from the cache: $this!"))
manager.evict(key)
case _ =>
// noop
}
}
def clear(): Unit = {
mapping.keys.foreach(evict)
mapping.clear()
}
def size(): Int = mapping.size
}
/**
* A helper that uses syntactic sugar to create a simple, thread-safe wrapper for
* an AsyncCache instance. Note that is may not provide optimal performance in
* write-heavy workloads; it's primary goal is to provide generic thread-safety
* for all AsyncCache implementations. Finer granularity synchronization is left
* as an exercise for custom AsyncCache implementations.
*
* usage:
*
* import ThreadSafeCache._
* val cache = new MyAsyncCache[K,V]().threadsafe
*
*/
object ThreadSafeCache {
final implicit class ThreadsafeCachePimp[K,V](cache: AsyncCache[K,V]) {
def threadsafe: AsyncCache[K,V] = cache match {
case cache: ThreadSafeCache[K,V] => cache
case _ => new ThreadSafeCache[K,V](cache)
}
}
/**
* A thread-safe AsyncCache wrapper for an underlying cache implementation.
* A ReadWrite Lock is used instead of this.synchronized(...) to reduce contention
* for read-heavy workloads.
*
* @param underlying a non-thread-safe AsyncCache instance
* @tparam K the type of the keys that map to the Promise[V] entries in the cache map
* @tparam V the type of the entries in the cache map
*/
private final class ThreadSafeCache[K,V](underlying: AsyncCache[K,V]) extends AsyncCache[K,V] {
override def get(key: K): Option[Future[V]] = read(underlying.get(key))
override def load(key: K): Future[V] = write(underlying.load(key))
override def evict(key: K) = write(underlying.evict(key))
override def clear() = write(underlying.clear())
override def size(): Int = read(underlying.size())
private val rwLock = new ReentrantReadWriteLock(true)
private val rLock = rwLock.readLock()
private val wLock = rwLock.writeLock()
private def write[T](f: => T): T = {
try { wLock.lock(); f }
finally wLock.unlock()
}
private def read[T](f: => T): T = {
try { rLock.lock(); f }
finally rLock.unlock()
}
// todo: there's probably a cleaner way to handle this
protected val manager = null
protected val policy = null
protected val limit = null
}
}
object Examples {
import ThreadSafeCache._
// a trait which defines a cache that maps files to arrays of strings (ex: lines from the files)
trait FileContentCache
extends AsyncCache[File,Array[String]]
with DefaultCache[File,Array[String]]
with LruEvictionPolicy[File,Array[String]]
// a FileContentCache that loads files in the same thread as the caller
final class SerialFileContentCache(val limit: Option[Int]) extends FileContentCache {
protected val manager = new EntryManager {
def evict(key: File) {
/* noop */
}
def load(key: File): Promise[Array[String]] = {
Promise.fromTry {
Try {
Source.fromFile(key).getLines().toArray
}
}
}
}
}
// a FileContentCache that uses a thread pool to load files asynchronously
final class ThreadPoolFileContentCache(val limit: Option[Int]) extends FileContentCache {
private val pool = Executors.newFixedThreadPool(limit.getOrElse(8))
private implicit val ctx = ExecutionContext.fromExecutor(pool)
protected val manager = new EntryManager {
def evict(key: File) {
/* noop */
}
def load(key: File): Promise[Array[String]] = {
mkPromise { Source.fromFile(key).getLines().toArray }
}
private def mkPromise(f: => Array[String]): Promise[Array[String]] = {
Promise[Array[String]]().completeWith(Future(f))
}
}
}
def main(argv: Array[String]) {
(0 until 10) foreach mkFile
val limit = Some(2)
runFileLoadExample(new SerialFileContentCache(limit))
runFileLoadExample(new ThreadPoolFileContentCache(limit))
Thread.sleep((1 second).toMillis) // wait for stdout to flush to the console before exiting
System.exit(0)
}
// a helper to run the file-load example using a given FileContentCache instance
private def runFileLoadExample(cache: FileContentCache) {
import ExecutionContext.Implicits._ // use the global execution context for the result callback
implicit def toFile(i: Int): File = { new File("/tmp", s"file-$i.txt") }
println(s"\nRunning file load example with cache: $cache")
// using a threadsafe cache, for the sake of giving an example
val tsCache = cache.threadsafe
tsCache.load(1) // load the 1st entry
tsCache.load(2) // load the 2nd entry
tsCache.load(3) // evict the 1st entry, load the 3rd entry
tsCache.load(2) // touch the 2nd entry
tsCache.load(4) // evict the 3rd entry, load the 4th entry
// asynchronously get the lines from the cache and compare them to the actual lines in the file
val result = cache.get(2).get map {
case cacheLines: Array[String] =>
val filesLines = Source.fromFile(2).getLines().toArray
for((l1,l2) <- cacheLines.zip(filesLines)) {
println(s"\t[${Thread.currentThread().getName}] $l1 =?= $l2")
}
}
Await.ready(result, 1 second)
}
// a helper to make a named file with 10 lines
private def mkFile(i: Int) {
val f = new File("/tmp", s"file-$i.txt")
f.createNewFile()
val writer = new FileWriter(f)
try {
for(j <- 0 until 10) {
writer.write(s"File $i : Line $j\n")
}
} finally {
writer.close()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment