-
-
Save sritchie/cb28fa90eedce4647cb6 to your computer and use it in GitHub Desktop.
Convert to a store.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.summingbird.store | |
import com.twitter.algebird.Monoid | |
import com.twitter.util.Future | |
import com.twitter.summingbird.batch.BatchID | |
/** | |
* @author Oscar Boykin | |
* @author Sam Ritchie | |
* | |
* The CommittingOnlineStore wraps a ConcurrentMutableStore in a way that allows for batched | |
* updates. The CommittingOnlineStore uses its Monoid[Value] to pre-aggregate key-value pairs | |
* into an internal MapStore[Key,Value], only committing out to the wrapped mutable store | |
* when the size of the internal store grows beyond a specified size (maxKeys). | |
* | |
* On read, the ConcurrentMutableStore will read a value out of both the backing mutable store | |
* and the internal store and merge them together with the Monoid[Value]. On system failure, | |
* this store will fail to commit at most (maxKeys - 1) total key-value pairs. | |
*/ | |
object CommittingOnlineStore { | |
def apply[StoreType <: ConcurrentMutableStore[StoreType, (Key,BatchID), Value], Key, Value: Monoid] | |
(maxKeys: Int, committedStore: StoreType) = | |
new CommittingOnlineStore[StoreType, Key, Value](maxKeys, committedStore) | |
} | |
class CommittingOnlineStore[StoreType <: ConcurrentMutableStore[StoreType, (Key, BatchID), Value], Key, Value: Monoid] | |
(maxKeys: Int, committedStore: StoreType) | |
extends Store[CommittingOnlineStore, (Key, BatchID), Value] { | |
var stagingStoreFuture: Future[MapStore[(Key, BatchID), Value]] = | |
Future.value(new MapStore[(Key,BatchID),Value]) | |
override def get(batchedKey: (Key, BatchID)): Future[Option[Value]] = | |
stagingStoreFuture flatMap { stagingStore => | |
stagingStore.get(batchedKey) | |
.join(committedStore.get(batchedKey)) | |
.map { pair => | |
pair match { | |
case (Some(v1), Some(v2)) => Some(Monoid.plus(v1,v2)); | |
case (None, a@_) => a; | |
case (a@_, None) => a; | |
} | |
} | |
} | |
// incStore updates the supplied key in the supplied | |
// ConcurrentMutableStore with a function from Option[V] => | |
// Option[V]. | |
// | |
// If the key is already present in the mutable store, this function | |
// monoid-adds the new value "v" into the store. If the key isn't | |
// present, the function returns an option on the new value "v". If | |
// either of these cases returns the monoid zero, the wrapping call | |
// to monoid.nonZeroOption will return None, removing the key from | |
// the mutable store. | |
protected def incStore[S <: Store[S,(Key, BatchID), Value]](s: S, k: (Key, BatchID), v: Value): Future[S] = { | |
s.update(k) { oldV => | |
Monoid.nonZeroOption(oldV.map { Monoid.plus(_,v) } getOrElse v) | |
} | |
} | |
// the stagingStore is a MapStore, so we can access the backing | |
// immutable scala map. "par" converts this backing map into a | |
// parallel collection, allowing a parallelized sinking of all | |
// key-value pairs into the wrapped ConcurrentMutableStore. | |
// | |
// See this blog post for more info on Scala's parallel collections: | |
// http://beust.com/weblog/2011/08/15/scalas-parallel-collections/ | |
def commit(mapStore: MapStore[(Key, BatchID), Value]): Future[Unit] = { | |
// build up an iterable of futures, each of which is responsible | |
// for committing a single key-value pair into the committedStore: | |
val commitFutures = mapStore.backingStore.map { case (k,v) => | |
incStore(committedStore, k, v) | |
} | |
Future.join(commitFutures.toSeq) | |
} | |
override def increment(pair: ((Key, BatchID), Value)) = { | |
val (kPair,v) = pair | |
val newStagingStoreFuture = | |
stagingStoreFuture | |
.flatMap { incStore(_, kPair, v) } | |
.flatMap { stagingStore => | |
if (stagingStore.size >= maxKeys) | |
commit(stagingStore) map { _ => new MapStore[(Key, BatchID), Value] } | |
else | |
Future(stagingStore) | |
} | |
// Assignment happens after future creation to ensure that the var | |
// assignment happens AFTER the read. | |
stagingStoreFuture = newStagingStoreFuture | |
// flatMap against newStagingStoreFuture to ensure a single read | |
// of the stagingStoreFuture var per call to increment. | |
newStagingStoreFuture flatMap { _ => Future.Unit } | |
} | |
override def close { stagingStoreFuture foreach { _.close } } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment