Skip to content

Instantly share code, notes, and snippets.

@rirakkumya
Created February 8, 2012 07:17
Show Gist options
  • Save rirakkumya/1766419 to your computer and use it in GitHub Desktop.
Save rirakkumya/1766419 to your computer and use it in GitHub Desktop.
【twitter Storm】ScalaStormとscala-redisでword countっちゃうコード
package storm.scala.examples
import storm.scala.dsl._
import backtype.storm.Config
import backtype.storm.LocalCluster
import backtype.storm.topology.TopologyBuilder
import backtype.storm.tuple.{Fields, Tuple, Values}
import com.redis._
import collection.mutable.{Map, HashMap}
import util.Random
class RandomSentenceSpout extends StormSpout(outputFields = List("sentence")) {
val sentences = List("the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature")
def nextTuple = {
Thread sleep 100
emit (sentences(Random.nextInt(sentences.length)))
}
}
// An example of using matchSeq for Scala pattern matching of Storm tuples
// plus using the emit and ack DSLs.
class SplitSentence extends StormBolt(outputFields = List("word")) {
def execute(t: Tuple) = t matchSeq {
case Seq(sentence: String) => sentence split " " foreach
{ word => using anchor t emit (word) }
t ack
}
}
class Store extends StormBolt(List("word")) {
var r:RedisClient = _
setup{
r = new RedisClient()
}
def execute(t:Tuple) = {
r.incr(t.getString(0))
t ack
}
}
object WordCountTopology extends App{
val r = new RedisClient()
r.flushall
val builder = new TopologyBuilder
builder.setSpout("randsentence", new RandomSentenceSpout, 1)
builder.setBolt("split", new SplitSentence, 10)
.shuffleGrouping("randsentence")
builder.setBolt("countStore", new Store, 10).fieldsGrouping("split",new Fields("word"))
val conf = new Config
conf.setDebug(true)
conf.setMaxTaskParallelism(3)
val cluster = new LocalCluster
cluster.submitTopology("word-count", conf, builder.createTopology)
Thread sleep 2000
cluster.shutdown
r.keys().getOrElse(List.empty).map{
case Some(key) => (key,r.get(key).getOrElse(""))
case None =>
}.foreach(println)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment