Skip to content

Instantly share code, notes, and snippets.

@bigsnarfdude
Last active August 29, 2015 14:08
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save bigsnarfdude/81a7422a0de1e3f99a83 to your computer and use it in GitHub Desktop.
Save bigsnarfdude/81a7422a0de1e3f99a83 to your computer and use it in GitHub Desktop.
twitter redis spark
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
import org.sedis._
import redis.clients.jedis._
object TwitterWordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: TwitterWordCount <master> [filter1]
[filter2] ... [filterN]")
System.exit(1)
}
val (master, filters) = (args.head, args.tail)
val pool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
val ssc = new StreamingContext(master, "TwitterWordCount", Seconds(5),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
val words = stream.flatMap(status => status.getText.toLowerCase.split("
")).map(word => (word, 1l))
val cntWords = words.reduceByKey(_ + _)
cntWords.foreach(rdd =>
pool.withJedisClient { client =>
val pipeline = client.pipelined()
rdd.foreach {
case (word, count) =>
pipeline.incrBy(word, count)
}
pipeline.sync()
}
)
ssc.start()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment