Created
February 14, 2013 07:49
-
-
Save sritchie/4951204 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.summingbird.storm.store | |
import backtype.storm.utils.DRPCClient | |
import com.twitter.bijection.Bijection | |
import com.twitter.util.{ Future, FuturePool } | |
import com.twitter.storehaus.ReadableStore | |
import com.twitter.summingbird.batch.BatchID | |
import com.twitter.summingbird.util.RpcBijection | |
import java.util.concurrent.Executors | |
import Bijection.asMethod // enable "as" syntax | |
/** | |
* Wrapper over backtype.storm.utils.DRPCClient. | |
* This ReadableStore allows the user to perform online read-only | |
* queries through Storm's DRPC mechanism. | |
* | |
* @author Oscar Boykin | |
* @author Sam Ritchie | |
*/ | |
object DRPCStore { | |
def apply[Key, Value](nimbusHost: String, appID: String, port: Int = 3772) | |
(implicit keyCodec: Bijection[Key, Array[Byte]], valCodec: Bijection[Value, Array[Byte]]) = | |
new DRPCStore[Key,Value](nimbusHost, appID, port) | |
} | |
class DRPCStore[Key, Value](nimbusHost: String, appID: String, port: Int) | |
(implicit kBijection: Bijection[Key, Array[Byte]], vBijection: Bijection[Value, Array[Byte]]) | |
extends ReadableStore[(Key, BatchID), Value] { | |
val futurePool = FuturePool(Executors.newFixedThreadPool(4)) | |
val drpcClient = new DRPCClient(nimbusHost, port) | |
implicit val pair2String: Bijection[(Key, BatchID), String] = RpcBijection.batchPair[Key] | |
implicit val val2String: Bijection[Option[Value], String] = RpcBijection.option[Value] | |
override def get(pair: (Key, BatchID)): Future[Option[Value]] = | |
futurePool { drpcClient.execute(appID, pair.as[String]) } | |
.map { _.as[Option[Value]] } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment