Skip to content

Instantly share code, notes, and snippets.

@MLnick
Created February 13, 2013 15:00
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save MLnick/4945185 to your computer and use it in GitHub Desktop.
Save MLnick/4945185 to your computer and use it in GitHub Desktop.
Spark Streaming with CountMinSketch from Twitter Algebird
import spark.streaming.{Seconds, StreamingContext}
import spark.storage.StorageLevel
import spark.streaming.examples.twitter.TwitterInputDStream
import com.twitter.algebird._
import spark.streaming.StreamingContext._
import spark.SparkContext._
/**
* Example of using CountMinSketch monoid from Twitter's Algebird together with Spark Streaming's
* TwitterInputDStream
*/
object StreamingCMS {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: TwitterStreamingCMS <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
val DELTA = 1E-3
val EPS = 0.01
val SEED = 1
val PERC = 0.001
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
val ssc = new StreamingContext(master, "TwitterStreamingCMS", Seconds(10))
val stream = new TwitterInputDStream(ssc, username, password, filters,
StorageLevel.MEMORY_ONLY_SER)
ssc.registerInputStream(stream)
val users = stream.map(status => status.getUser.getId)
var globalCMS = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC).zero
var globalExact = Map[Long, Int]()
val mm = new MapMonoid[Long, Int]()
val approxTopUsers = users.mapPartitions(ids => {
val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC)
ids.map(id => cms.create(id))
}).reduce(_ ++ _)
val exactTopUsers = users.map(id => (id, 1))
.reduceByKey((a, b) => a + b)
approxTopUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
globalCMS ++= partial
val globalTopK = globalCMS.heavyHitters.map(id => (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, 5)
println("Approx heavy hitters at %2.2f%% users this batch: %s".format(PERC, partial.heavyHitters.mkString("[", ",", "]")))
println("Approx heavy hitters at %2.2f%% users overall: %s".format(PERC, globalTopK.mkString("[", ",", "]")))
}
})
exactTopUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partialMap = rdd.collect().toMap
val partialTopK = rdd.map({case (id, count) => (count, id)}).sortByKey(ascending = false).take(5)
globalExact = mm.plus(globalExact.toMap, partialMap)
val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, 5)
println("Exact heavy hitters this batch: %s".format(partialTopK.mkString(",")))
println("Exact heavy hitters overall: %s".format(globalTopK.mkString(",")))
}
})
ssc.start()
}
}
@qiang5714
Copy link

Hi MLnick, is there any advice for how to keep the value globalCMS safe in a long running spark streming job? I keep thinking I may lost it. And I can not figure out one way to save the cms value to a out store. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment