Last active
December 18, 2020 17:33
-
-
Save vitillo/25a20b7c8685c0c82422 to your computer and use it in GitHub Desktop.
Spark Tutorial @ Mozlandia 2014
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//========================================================================== | |
// A gentle tutorial to Spark and Telemetry | |
//========================================================================== | |
// Let's import some Spark classes | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.SparkConf | |
// ... and some json parsing utilities | |
import org.json4s._ | |
import org.json4s.jackson.JsonMethods._ | |
// ... and some math functions | |
import scala.math._ | |
// ... and the API for Telemetry. | |
import Mozilla.Telemetry._ | |
//========================================================================== | |
// Spark: Configuration | |
//========================================================================== | |
// Brings in some default formatters for the json library (e.g. for dates). | |
implicit lazy val formats = DefaultFormats | |
// Configure Spark to run locally and use as many cores as available. | |
val conf = new SparkConf().setAppName("mozilla-telemetry").setMaster("local[*]") | |
// Create a Spark context; Spark applications run as independent sets of | |
// processes on a cluster, coordinated by the SparkContext object in your | |
// main program (called the driver). | |
implicit val sc = new SparkContext(conf) | |
//========================================================================== | |
// Spark: RDD | |
//========================================================================== | |
// Let's create our first RDD. | |
val dataset = sc.parallelize(1 to 1000) | |
// A RDD is divided in partitions. | |
dataset.partitions.length | |
// We can also read a file from the local filesystem into a RDD. | |
val passwd = sc.textFile("/etc/passwd") | |
// RDD support two type of operations: transformations and actions. | |
// https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD | |
//========================================================================== | |
// Spark: RDD Transformations | |
//========================================================================== | |
// A transformation creates a new dataset from an existing one. | |
val squared = dataset.map(x => x*x) | |
// The map operation is a method of the RDD, while the lambda function | |
// passed as argument is a common Scala function. As long the code is serializable | |
// there are no restrictions on the kind of Scala code that can be executed. | |
// Note that all transformations in Spark are lazy; an action is required | |
// to actually realize a transformation, which explains why the map returned so quickly. | |
// Let's keep all multiples of 3. | |
dataset.filter(x => x % 3 == 0) | |
// Let's get a 10% sample without replacement. | |
dataset.sample(false, 0.1) | |
//========================================================================== | |
// Spark: RDD Actions | |
//========================================================================== | |
// An action returns a value after running a computation on the dataset. | |
squared.count | |
// Get the first element. | |
squared.first() | |
// Get the first k elements. | |
squared.take(5) | |
// Get a Scala array of all elements. | |
squared.collect | |
// Note that you can't access an arbitrary row of the RDD. | |
// Reduce the elements of the RDD. | |
squared.reduce((x, y) => x + y) | |
//========================================================================== | |
// Spark: RDD Caching | |
//========================================================================== | |
// Rerunning an action will retrigger the computation. | |
squared.count | |
// We can cache a RDD so that successive accesses to it don't recompute the | |
// whole thing. | |
squared.cache | |
squared.count | |
squared.count | |
// Once we are done with a dataset we can remove it from memory. | |
squared.unpersist() | |
// Doesn't seem like much right now but once you start handling some real | |
// datasets you will appreciate the speed boost. | |
// Check out http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence | |
// to learn more about the persistence policies. | |
//========================================================================== | |
// Spark: key-value pairs | |
//========================================================================== | |
// Most Spark operations work on RDDs of any types but few are reserved for | |
// key-value pairs, the most common ones are distributed “shuffle” operations, | |
// such as grouping or aggregating the elements by a key. | |
val grouped = dataset.map(x => (x % 2 == 0, x)) | |
// We can reduce by key, | |
grouped.reduceByKey((x, y) => x + y).collectAsMap() | |
// ... group by key, | |
grouped.groupByKey().collectAsMap() | |
// or count by key (action). | |
grouped.countByKey() | |
// Check out https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions | |
// for a complete list of operations on key-value pairs. | |
//========================================================================== | |
// Spark: doubles | |
//========================================================================== | |
// Some simple stats operations are reserved for RDDs of doubles. | |
val doubles = dataset.map(x => x.toDouble) | |
// The usual suspects... | |
doubles.mean() | |
doubles.sum() | |
doubles.stdev() | |
// You can even build a histogram! | |
doubles.histogram(10) | |
// Check out https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.DoubleRDDFunctions | |
// for a complete list of operations on doubles. | |
//========================================================================== | |
// Telemetry: loading the data | |
//========================================================================== | |
// The Mozilla.Telemetry.Pings companion object can be used to build a wrapper | |
// for filtered Telemetry data. The filterable dimensions are application name, | |
// update channel, version, build-id and submission date. | |
val ds = Pings("Firefox", "nightly", "36.0a1", "20141110030204", "20141110") | |
// The RDD method of a Pings object returns a RDD containing a fraction | |
// of the pings, one per row. | |
val pings = ds.RDD(0.1) | |
// A ping is composed by a UUID and a json blob. | |
pings.first() | |
// Let's parse the json blobs and fetch the info and simpleMeasurements fields | |
// which contain respectively data about the users's software/hardware | |
// configuration and startup timings (see about:telemetry). | |
val parsed = pings.map(ping => { | |
val parsed = parse(ping.substring(37)) | |
(parsed \ "info", parsed \ "simpleMeasurements") | |
}) | |
// Each time an action is executed on the RDD, pings are fetched from S3 and parsed; | |
// this can quickly become slow. | |
parsed.first() | |
parsed.first() | |
// So let's cache them! | |
parsed.cache | |
parsed.first() | |
parsed.first() | |
// Caveat: caching the whole ping is a *bad* idea and there is probably never | |
// a good reason to do it. | |
//========================================================================== | |
// Telemetry: analysis | |
//========================================================================== | |
// We are interested in comparing the average startup time of FF for different OSs. | |
// Let's map to tuples of (OS, firstPaint). | |
val osStartup = parsed.map{ case(info, simple) => | |
((info \ "OS").extract[String], (simple \ "firstPaint").extract[Int]) | |
} | |
// Compute the mean and standard deviation. | |
osStartup.groupByKey().map{ case(os, times) => { | |
var mean = times.sum.toDouble/times.size | |
var sd = sqrt(times.map(t => (t*t).toDouble).sum/times.size - mean*mean) | |
(os, (mean, sd)) | |
}}.collectAsMap() | |
// We just made the simplifying assumption that the reduced collections fit | |
// in the driver's memory, but what if they don't? | |
osStartup.filter(_._1 == "WINNT").map(_._2.toDouble).stats | |
osStartup.filter(_._1 == "Darwin").map(_._2.toDouble).stats | |
osStartup.filter(_._1 == "Linux").map(_._2.toDouble).stats | |
//========================================================================== | |
// The End | |
//========================================================================== | |
// Time to continue explore on your own! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment