Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Spark Tutorial @ Mozlandia 2014
//==========================================================================
// A gentle tutorial to Spark and Telemetry
//==========================================================================
// Let's import some Spark classes
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
// ... and some json parsing utilities
import org.json4s._
import org.json4s.jackson.JsonMethods._
// ... and some math functions
import scala.math._
// ... and the API for Telemetry.
import Mozilla.Telemetry._
//==========================================================================
// Spark: Configuration
//==========================================================================
// Brings in some default formatters for the json library (e.g. for dates).
implicit lazy val formats = DefaultFormats
// Configure Spark to run locally and use as many cores as available.
val conf = new SparkConf().setAppName("mozilla-telemetry").setMaster("local[*]")
// Create a Spark context; Spark applications run as independent sets of
// processes on a cluster, coordinated by the SparkContext object in your
// main program (called the driver).
implicit val sc = new SparkContext(conf)
//==========================================================================
// Spark: RDD
//==========================================================================
// Let's create our first RDD.
val dataset = sc.parallelize(1 to 1000)
// A RDD is divided in partitions.
dataset.partitions.length
// We can also read a file from the local filesystem into a RDD.
val passwd = sc.textFile("/etc/passwd")
// RDD support two type of operations: transformations and actions.
// https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD
//==========================================================================
// Spark: RDD Transformations
//==========================================================================
// A transformation creates a new dataset from an existing one.
val squared = dataset.map(x => x*x)
// The map operation is a method of the RDD, while the lambda function
// passed as argument is a common Scala function. As long the code is serializable
// there are no restrictions on the kind of Scala code that can be executed.
// Note that all transformations in Spark are lazy; an action is required
// to actually realize a transformation, which explains why the map returned so quickly.
// Let's keep all multiples of 3.
dataset.filter(x => x % 3 == 0)
// Let's get a 10% sample without replacement.
dataset.sample(false, 0.1)
//==========================================================================
// Spark: RDD Actions
//==========================================================================
// An action returns a value after running a computation on the dataset.
squared.count
// Get the first element.
squared.first()
// Get the first k elements.
squared.take(5)
// Get a Scala array of all elements.
squared.collect
// Note that you can't access an arbitrary row of the RDD.
// Reduce the elements of the RDD.
squared.reduce((x, y) => x + y)
//==========================================================================
// Spark: RDD Caching
//==========================================================================
// Rerunning an action will retrigger the computation.
squared.count
// We can cache a RDD so that successive accesses to it don't recompute the
// whole thing.
squared.cache
squared.count
squared.count
// Once we are done with a dataset we can remove it from memory.
squared.unpersist()
// Doesn't seem like much right now but once you start handling some real
// datasets you will appreciate the speed boost.
// Check out http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
// to learn more about the persistence policies.
//==========================================================================
// Spark: key-value pairs
//==========================================================================
// Most Spark operations work on RDDs of any types but few are reserved for
// key-value pairs, the most common ones are distributed “shuffle” operations,
// such as grouping or aggregating the elements by a key.
val grouped = dataset.map(x => (x % 2 == 0, x))
// We can reduce by key,
grouped.reduceByKey((x, y) => x + y).collectAsMap()
// ... group by key,
grouped.groupByKey().collectAsMap()
// or count by key (action).
grouped.countByKey()
// Check out https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
// for a complete list of operations on key-value pairs.
//==========================================================================
// Spark: doubles
//==========================================================================
// Some simple stats operations are reserved for RDDs of doubles.
val doubles = dataset.map(x => x.toDouble)
// The usual suspects...
doubles.mean()
doubles.sum()
doubles.stdev()
// You can even build a histogram!
doubles.histogram(10)
// Check out https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.DoubleRDDFunctions
// for a complete list of operations on doubles.
//==========================================================================
// Telemetry: loading the data
//==========================================================================
// The Mozilla.Telemetry.Pings companion object can be used to build a wrapper
// for filtered Telemetry data. The filterable dimensions are application name,
// update channel, version, build-id and submission date.
val ds = Pings("Firefox", "nightly", "36.0a1", "20141110030204", "20141110")
// The RDD method of a Pings object returns a RDD containing a fraction
// of the pings, one per row.
val pings = ds.RDD(0.1)
// A ping is composed by a UUID and a json blob.
pings.first()
// Let's parse the json blobs and fetch the info and simpleMeasurements fields
// which contain respectively data about the users's software/hardware
// configuration and startup timings (see about:telemetry).
val parsed = pings.map(ping => {
val parsed = parse(ping.substring(37))
(parsed \ "info", parsed \ "simpleMeasurements")
})
// Each time an action is executed on the RDD, pings are fetched from S3 and parsed;
// this can quickly become slow.
parsed.first()
parsed.first()
// So let's cache them!
parsed.cache
parsed.first()
parsed.first()
// Caveat: caching the whole ping is a *bad* idea and there is probably never
// a good reason to do it.
//==========================================================================
// Telemetry: analysis
//==========================================================================
// We are interested in comparing the average startup time of FF for different OSs.
// Let's map to tuples of (OS, firstPaint).
val osStartup = parsed.map{ case(info, simple) =>
((info \ "OS").extract[String], (simple \ "firstPaint").extract[Int])
}
// Compute the mean and standard deviation.
osStartup.groupByKey().map{ case(os, times) => {
var mean = times.sum.toDouble/times.size
var sd = sqrt(times.map(t => (t*t).toDouble).sum/times.size - mean*mean)
(os, (mean, sd))
}}.collectAsMap()
// We just made the simplifying assumption that the reduced collections fit
// in the driver's memory, but what if they don't?
osStartup.filter(_._1 == "WINNT").map(_._2.toDouble).stats
osStartup.filter(_._1 == "Darwin").map(_._2.toDouble).stats
osStartup.filter(_._1 == "Linux").map(_._2.toDouble).stats
//==========================================================================
// The End
//==========================================================================
// Time to continue explore on your own!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment