Skip to content

Instantly share code, notes, and snippets.

@vitillo
Created September 30, 2015 17:26
Show Gist options
  • Save vitillo/80381287362901d7d2ec to your computer and use it in GitHub Desktop.
Save vitillo/80381287362901d7d2ec to your computer and use it in GitHub Desktop.
Telemetry tutorial
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{"nbformat_minor": 0, "cells": [{"source": "### RDD", "cell_type": "markdown", "metadata": {}}, {"source": "Let's create our first [RDD](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD). Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver).", "cell_type": "markdown", "metadata": {}}, {"execution_count": 2, "cell_type": "code", "source": "dataset = sc.parallelize(range(1000))", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"source": "A RDD is divided in partitions.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 7, "cell_type": "code", "source": "dataset.toDebugString()", "outputs": [{"execution_count": 7, "output_type": "execute_result", "data": {"text/plain": "'(16) ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:392 []'"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "We can also read a file from the local filesystem into a RDD.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 8, "cell_type": "code", "source": "passwd = sc.textFile(\"/etc/passwd\")", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 9, "cell_type": "code", "source": "passwd", "outputs": [{"execution_count": 9, "output_type": "execute_result", "data": {"text/plain": "/etc/passwd MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:-2"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "[RDD](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD) support two type of operations: transformations and actions. ", "cell_type": "markdown", "metadata": {}}, {"source": "###RDD Transformations", "cell_type": "markdown", "metadata": {}}, {"source": "A transformation creates a new dataset from an existing one.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 11, "cell_type": "code", "source": "squared = dataset.map(lambda x: x*x)", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"source": "The map operation is a method of the RDD, while the lambda function passed as argument is a common Scala function. As long the code is serializable there are no restrictions on the kind of Scala code that can be executed.", "cell_type": "markdown", "metadata": {}}, {"source": "Note that all transformations in Spark are lazy; an action is required to actually realize a transformation, which explains why the map returned so quickly.", "cell_type": "markdown", "metadata": {}}, {"source": "Let's keep all multiples of 3.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 12, "cell_type": "code", "source": "dataset.filter(lambda x: x % 3 == 0)", "outputs": [{"execution_count": 12, "output_type": "execute_result", "data": {"text/plain": "PythonRDD[3] at RDD at PythonRDD.scala:43"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Let's get a 10% sample without replacement.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 14, "cell_type": "code", "source": "dataset.sample(False, 0.1)", "outputs": [{"execution_count": 14, "output_type": "execute_result", "data": {"text/plain": "PythonRDD[4] at RDD at PythonRDD.scala:43"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "### RDD Actions", "cell_type": "markdown", "metadata": {}}, {"source": "An action returns a value after running a computation on the dataset.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 15, "cell_type": "code", "source": "squared.count()", "outputs": [{"execution_count": 15, "output_type": "execute_result", "data": {"text/plain": "1000"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Get the first element.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 16, "cell_type": "code", "source": "squared.first()", "outputs": [{"execution_count": 16, "output_type": "execute_result", "data": {"text/plain": "0"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Get the first k elements.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 17, "cell_type": "code", "source": "squared.take(5)", "outputs": [{"execution_count": 17, "output_type": "execute_result", "data": {"text/plain": "[0, 1, 4, 9, 16]"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Get a Python list of all elements.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 20, "cell_type": "code", "source": "squared.collect()[:5]", "outputs": [{"execution_count": 20, "output_type": "execute_result", "data": {"text/plain": "[0, 1, 4, 9, 16]"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Note that you can't access an arbitrary row of the RDD.", "cell_type": "markdown", "metadata": {}}, {"source": "Reduce the elements of the RDD.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 21, "cell_type": "code", "source": "squared.reduce(lambda x, y: x + y)", "outputs": [{"execution_count": 21, "output_type": "execute_result", "data": {"text/plain": "332833500"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "###RDD Caching", "cell_type": "markdown", "metadata": {}}, {"source": "Rerunning an action will retrigger the computation.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 22, "cell_type": "code", "source": "squared.count()", "outputs": [{"execution_count": 22, "output_type": "execute_result", "data": {"text/plain": "1000"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "We can cache a RDD so that successive accesses to it don't recompute the whole thing.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 23, "cell_type": "code", "source": "squared.cache()", "outputs": [{"execution_count": 23, "output_type": "execute_result", "data": {"text/plain": "PythonRDD[6] at RDD at PythonRDD.scala:43"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 24, "cell_type": "code", "source": "squared.count()", "outputs": [{"execution_count": 24, "output_type": "execute_result", "data": {"text/plain": "1000"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 25, "cell_type": "code", "source": "squared.count()", "outputs": [{"execution_count": 25, "output_type": "execute_result", "data": {"text/plain": "1000"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Once we are done with a dataset we can remove it from memory.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 26, "cell_type": "code", "source": "squared.unpersist()", "outputs": [{"execution_count": 26, "output_type": "execute_result", "data": {"text/plain": "PythonRDD[6] at RDD at PythonRDD.scala:43"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Doesn't seem like much right now but once you start handling some real datasets you will appreciate the speed boost, assuming it fits in memory that is. Check out this [documentation](http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) to learn more about the persistence policies.", "cell_type": "markdown", "metadata": {}}, {"source": "###Key-value pairs", "cell_type": "markdown", "metadata": {}}, {"source": "Most Spark operations work on RDDs of any types but few are reserved for key-value pairs, the most common ones are distributed \u201cshuffle\u201d operations, such as grouping or aggregating the elements by a key.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 28, "cell_type": "code", "source": "grouped = dataset.map(lambda x: (x % 2 == 0, x))", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"source": "We can reduce by key,", "cell_type": "markdown", "metadata": {}}, {"execution_count": 30, "cell_type": "code", "source": "grouped.reduceByKey(lambda x, y: x + y).collectAsMap()", "outputs": [{"execution_count": 30, "output_type": "execute_result", "data": {"text/plain": "{False: 250000, True: 249500}"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "... group by key,", "cell_type": "markdown", "metadata": {}}, {"execution_count": 33, "cell_type": "code", "source": "grouped.groupByKey().collectAsMap()", "outputs": [{"execution_count": 33, "output_type": "execute_result", "data": {"text/plain": "{False: <pyspark.resultiterable.ResultIterable at 0x7f745b2b09d0>,\n True: <pyspark.resultiterable.ResultIterable at 0x7f745b2b0890>}"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "or count by key (action).", "cell_type": "markdown", "metadata": {}}, {"execution_count": 34, "cell_type": "code", "source": "grouped.countByKey()", "outputs": [{"execution_count": 34, "output_type": "execute_result", "data": {"text/plain": "defaultdict(<type 'int'>, {False: 500, True: 500})"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Check out the [documentation](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions) for a complete list of operations on key-value pairs.", "cell_type": "markdown", "metadata": {}}, {"source": "### Doubles", "cell_type": "markdown", "metadata": {}}, {"source": "Some simple stats operations are reserved for RDDs of scalars.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 39, "cell_type": "code", "source": "dataset.mean()", "outputs": [{"execution_count": 39, "output_type": "execute_result", "data": {"text/plain": "499.5"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 40, "cell_type": "code", "source": "dataset.sum()", "outputs": [{"execution_count": 40, "output_type": "execute_result", "data": {"text/plain": "499500"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 41, "cell_type": "code", "source": "dataset.stdev()", "outputs": [{"execution_count": 41, "output_type": "execute_result", "data": {"text/plain": "288.67499025720952"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "You can also create a simple textual histogram.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 42, "cell_type": "code", "source": "dataset.histogram(10)", "outputs": [{"execution_count": 42, "output_type": "execute_result", "data": {"text/plain": "([0.0,\n 99.9,\n 199.8,\n 299.70000000000005,\n 399.6,\n 499.5,\n 599.4000000000001,\n 699.3000000000001,\n 799.2,\n 899.1,\n 999],\n [100, 100, 100, 100, 100, 100, 100, 100, 100, 100])"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Check out the [documentation](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.DoubleRDDFunctions) for a complete list of operations on doubles.", "cell_type": "markdown", "metadata": {}}, {"source": "##### Exercises", "cell_type": "markdown", "metadata": {}}, {"execution_count": null, "cell_type": "code", "source": "", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}], "nbformat": 4, "metadata": {"kernelspec": {"display_name": "Python 2", "name": "python2", "language": "python"}, "language_info": {"mimetype": "text/x-python", "nbconvert_exporter": "python", "version": "2.7.9", "name": "python", "file_extension": ".py", "pygments_lexer": "ipython2", "codemirror_mode": {"version": 2, "name": "ipython"}}}}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment