Skip to content

Instantly share code, notes, and snippets.

@sritchie
Created January 22, 2013 20:31
Show Gist options
  • Save sritchie/cb28fa90eedce4647cb6 to your computer and use it in GitHub Desktop.
Save sritchie/cb28fa90eedce4647cb6 to your computer and use it in GitHub Desktop.
Convert to a store.
package com.twitter.summingbird.store
import com.twitter.algebird.Monoid
import com.twitter.util.Future
import com.twitter.summingbird.batch.BatchID
/**
* @author Oscar Boykin
* @author Sam Ritchie
*
* The CommittingOnlineStore wraps a ConcurrentMutableStore in a way that allows for batched
* updates. The CommittingOnlineStore uses its Monoid[Value] to pre-aggregate key-value pairs
* into an internal MapStore[Key,Value], only committing out to the wrapped mutable store
* when the size of the internal store grows beyond a specified size (maxKeys).
*
* On read, the ConcurrentMutableStore will read a value out of both the backing mutable store
* and the internal store and merge them together with the Monoid[Value]. On system failure,
* this store will fail to commit at most (maxKeys - 1) total key-value pairs.
*/
object CommittingOnlineStore {
def apply[StoreType <: ConcurrentMutableStore[StoreType, (Key,BatchID), Value], Key, Value: Monoid]
(maxKeys: Int, committedStore: StoreType) =
new CommittingOnlineStore[StoreType, Key, Value](maxKeys, committedStore)
}
class CommittingOnlineStore[StoreType <: ConcurrentMutableStore[StoreType, (Key, BatchID), Value], Key, Value: Monoid]
(maxKeys: Int, committedStore: StoreType)
extends Store[CommittingOnlineStore, (Key, BatchID), Value] {
var stagingStoreFuture: Future[MapStore[(Key, BatchID), Value]] =
Future.value(new MapStore[(Key,BatchID),Value])
override def get(batchedKey: (Key, BatchID)): Future[Option[Value]] =
stagingStoreFuture flatMap { stagingStore =>
stagingStore.get(batchedKey)
.join(committedStore.get(batchedKey))
.map { pair =>
pair match {
case (Some(v1), Some(v2)) => Some(Monoid.plus(v1,v2));
case (None, a@_) => a;
case (a@_, None) => a;
}
}
}
// incStore updates the supplied key in the supplied
// ConcurrentMutableStore with a function from Option[V] =>
// Option[V].
//
// If the key is already present in the mutable store, this function
// monoid-adds the new value "v" into the store. If the key isn't
// present, the function returns an option on the new value "v". If
// either of these cases returns the monoid zero, the wrapping call
// to monoid.nonZeroOption will return None, removing the key from
// the mutable store.
protected def incStore[S <: Store[S,(Key, BatchID), Value]](s: S, k: (Key, BatchID), v: Value): Future[S] = {
s.update(k) { oldV =>
Monoid.nonZeroOption(oldV.map { Monoid.plus(_,v) } getOrElse v)
}
}
// the stagingStore is a MapStore, so we can access the backing
// immutable scala map. "par" converts this backing map into a
// parallel collection, allowing a parallelized sinking of all
// key-value pairs into the wrapped ConcurrentMutableStore.
//
// See this blog post for more info on Scala's parallel collections:
// http://beust.com/weblog/2011/08/15/scalas-parallel-collections/
def commit(mapStore: MapStore[(Key, BatchID), Value]): Future[Unit] = {
// build up an iterable of futures, each of which is responsible
// for committing a single key-value pair into the committedStore:
val commitFutures = mapStore.backingStore.map { case (k,v) =>
incStore(committedStore, k, v)
}
Future.join(commitFutures.toSeq)
}
override def increment(pair: ((Key, BatchID), Value)) = {
val (kPair,v) = pair
val newStagingStoreFuture =
stagingStoreFuture
.flatMap { incStore(_, kPair, v) }
.flatMap { stagingStore =>
if (stagingStore.size >= maxKeys)
commit(stagingStore) map { _ => new MapStore[(Key, BatchID), Value] }
else
Future(stagingStore)
}
// Assignment happens after future creation to ensure that the var
// assignment happens AFTER the read.
stagingStoreFuture = newStagingStoreFuture
// flatMap against newStagingStoreFuture to ensure a single read
// of the stagingStoreFuture var per call to increment.
newStagingStoreFuture flatMap { _ => Future.Unit }
}
override def close { stagingStoreFuture foreach { _.close } }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment