Created
September 30, 2015 17:26
-
-
Save vitillo/80381287362901d7d2ec to your computer and use it in GitHub Desktop.
Telemetry tutorial
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"nbformat_minor": 0, "cells": [{"source": "### RDD", "cell_type": "markdown", "metadata": {}}, {"source": "Let's create our first [RDD](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD). Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver).", "cell_type": "markdown", "metadata": {}}, {"execution_count": 2, "cell_type": "code", "source": "dataset = sc.parallelize(range(1000))", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"source": "A RDD is divided in partitions.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 7, "cell_type": "code", "source": "dataset.toDebugString()", "outputs": [{"execution_count": 7, "output_type": "execute_result", "data": {"text/plain": "'(16) ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:392 []'"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "We can also read a file from the local filesystem into a RDD.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 8, "cell_type": "code", "source": "passwd = sc.textFile(\"/etc/passwd\")", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 9, "cell_type": "code", "source": "passwd", "outputs": [{"execution_count": 9, "output_type": "execute_result", "data": {"text/plain": "/etc/passwd MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:-2"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "[RDD](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD) support two type of operations: transformations and actions. ", "cell_type": "markdown", "metadata": {}}, {"source": "###RDD Transformations", "cell_type": "markdown", "metadata": {}}, {"source": "A transformation creates a new dataset from an existing one.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 11, "cell_type": "code", "source": "squared = dataset.map(lambda x: x*x)", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"source": "The map operation is a method of the RDD, while the lambda function passed as argument is a common Scala function. As long the code is serializable there are no restrictions on the kind of Scala code that can be executed.", "cell_type": "markdown", "metadata": {}}, {"source": "Note that all transformations in Spark are lazy; an action is required to actually realize a transformation, which explains why the map returned so quickly.", "cell_type": "markdown", "metadata": {}}, {"source": "Let's keep all multiples of 3.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 12, "cell_type": "code", "source": "dataset.filter(lambda x: x % 3 == 0)", "outputs": [{"execution_count": 12, "output_type": "execute_result", "data": {"text/plain": "PythonRDD[3] at RDD at PythonRDD.scala:43"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Let's get a 10% sample without replacement.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 14, "cell_type": "code", "source": "dataset.sample(False, 0.1)", "outputs": [{"execution_count": 14, "output_type": "execute_result", "data": {"text/plain": "PythonRDD[4] at RDD at PythonRDD.scala:43"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "### RDD Actions", "cell_type": "markdown", "metadata": {}}, {"source": "An action returns a value after running a computation on the dataset.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 15, "cell_type": "code", "source": "squared.count()", "outputs": [{"execution_count": 15, "output_type": "execute_result", "data": {"text/plain": "1000"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Get the first element.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 16, "cell_type": "code", "source": "squared.first()", "outputs": [{"execution_count": 16, "output_type": "execute_result", "data": {"text/plain": "0"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Get the first k elements.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 17, "cell_type": "code", "source": "squared.take(5)", "outputs": [{"execution_count": 17, "output_type": "execute_result", "data": {"text/plain": "[0, 1, 4, 9, 16]"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Get a Python list of all elements.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 20, "cell_type": "code", "source": "squared.collect()[:5]", "outputs": [{"execution_count": 20, "output_type": "execute_result", "data": {"text/plain": "[0, 1, 4, 9, 16]"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Note that you can't access an arbitrary row of the RDD.", "cell_type": "markdown", "metadata": {}}, {"source": "Reduce the elements of the RDD.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 21, "cell_type": "code", "source": "squared.reduce(lambda x, y: x + y)", "outputs": [{"execution_count": 21, "output_type": "execute_result", "data": {"text/plain": "332833500"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "###RDD Caching", "cell_type": "markdown", "metadata": {}}, {"source": "Rerunning an action will retrigger the computation.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 22, "cell_type": "code", "source": "squared.count()", "outputs": [{"execution_count": 22, "output_type": "execute_result", "data": {"text/plain": "1000"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "We can cache a RDD so that successive accesses to it don't recompute the whole thing.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 23, "cell_type": "code", "source": "squared.cache()", "outputs": [{"execution_count": 23, "output_type": "execute_result", "data": {"text/plain": "PythonRDD[6] at RDD at PythonRDD.scala:43"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 24, "cell_type": "code", "source": "squared.count()", "outputs": [{"execution_count": 24, "output_type": "execute_result", "data": {"text/plain": "1000"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 25, "cell_type": "code", "source": "squared.count()", "outputs": [{"execution_count": 25, "output_type": "execute_result", "data": {"text/plain": "1000"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Once we are done with a dataset we can remove it from memory.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 26, "cell_type": "code", "source": "squared.unpersist()", "outputs": [{"execution_count": 26, "output_type": "execute_result", "data": {"text/plain": "PythonRDD[6] at RDD at PythonRDD.scala:43"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Doesn't seem like much right now but once you start handling some real datasets you will appreciate the speed boost, assuming it fits in memory that is. Check out this [documentation](http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) to learn more about the persistence policies.", "cell_type": "markdown", "metadata": {}}, {"source": "###Key-value pairs", "cell_type": "markdown", "metadata": {}}, {"source": "Most Spark operations work on RDDs of any types but few are reserved for key-value pairs, the most common ones are distributed \u201cshuffle\u201d operations, such as grouping or aggregating the elements by a key.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 28, "cell_type": "code", "source": "grouped = dataset.map(lambda x: (x % 2 == 0, x))", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"source": "We can reduce by key,", "cell_type": "markdown", "metadata": {}}, {"execution_count": 30, "cell_type": "code", "source": "grouped.reduceByKey(lambda x, y: x + y).collectAsMap()", "outputs": [{"execution_count": 30, "output_type": "execute_result", "data": {"text/plain": "{False: 250000, True: 249500}"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "... group by key,", "cell_type": "markdown", "metadata": {}}, {"execution_count": 33, "cell_type": "code", "source": "grouped.groupByKey().collectAsMap()", "outputs": [{"execution_count": 33, "output_type": "execute_result", "data": {"text/plain": "{False: <pyspark.resultiterable.ResultIterable at 0x7f745b2b09d0>,\n True: <pyspark.resultiterable.ResultIterable at 0x7f745b2b0890>}"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "or count by key (action).", "cell_type": "markdown", "metadata": {}}, {"execution_count": 34, "cell_type": "code", "source": "grouped.countByKey()", "outputs": [{"execution_count": 34, "output_type": "execute_result", "data": {"text/plain": "defaultdict(<type 'int'>, {False: 500, True: 500})"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Check out the [documentation](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions) for a complete list of operations on key-value pairs.", "cell_type": "markdown", "metadata": {}}, {"source": "### Doubles", "cell_type": "markdown", "metadata": {}}, {"source": "Some simple stats operations are reserved for RDDs of scalars.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 39, "cell_type": "code", "source": "dataset.mean()", "outputs": [{"execution_count": 39, "output_type": "execute_result", "data": {"text/plain": "499.5"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 40, "cell_type": "code", "source": "dataset.sum()", "outputs": [{"execution_count": 40, "output_type": "execute_result", "data": {"text/plain": "499500"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 41, "cell_type": "code", "source": "dataset.stdev()", "outputs": [{"execution_count": 41, "output_type": "execute_result", "data": {"text/plain": "288.67499025720952"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "You can also create a simple textual histogram.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 42, "cell_type": "code", "source": "dataset.histogram(10)", "outputs": [{"execution_count": 42, "output_type": "execute_result", "data": {"text/plain": "([0.0,\n 99.9,\n 199.8,\n 299.70000000000005,\n 399.6,\n 499.5,\n 599.4000000000001,\n 699.3000000000001,\n 799.2,\n 899.1,\n 999],\n [100, 100, 100, 100, 100, 100, 100, 100, 100, 100])"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Check out the [documentation](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.DoubleRDDFunctions) for a complete list of operations on doubles.", "cell_type": "markdown", "metadata": {}}, {"source": "##### Exercises", "cell_type": "markdown", "metadata": {}}, {"execution_count": null, "cell_type": "code", "source": "", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}], "nbformat": 4, "metadata": {"kernelspec": {"display_name": "Python 2", "name": "python2", "language": "python"}, "language_info": {"mimetype": "text/x-python", "nbconvert_exporter": "python", "version": "2.7.9", "name": "python", "file_extension": ".py", "pygments_lexer": "ipython2", "codemirror_mode": {"version": 2, "name": "ipython"}}}} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment