Created
November 1, 2015 22:27
-
-
Save vitillo/f9fb53492adf72d88aa2 to your computer and use it in GitHub Desktop.
Fast Derived Streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"nbformat_minor": 0, "cells": [{"source": "### Fast Derived Streams", "cell_type": "markdown", "metadata": {}}, {"source": "This is a short example of the capabilities provided by derived streams stored in an analysis friendly format.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 1, "cell_type": "code", "source": "import numpy as np\nfrom pyspark.sql import SQLContext\nfrom pyspark.sql.types import *\n\n%pylab inline", "outputs": [{"output_type": "stream", "name": "stdout", "text": "Populating the interactive namespace from numpy and matplotlib\n"}], "metadata": {"scrolled": false, "collapsed": false, "trusted": true}}, {"source": "Note that we are using a cluster with a single worker:", "cell_type": "markdown", "metadata": {}}, {"execution_count": 2, "cell_type": "code", "source": "sc.defaultParallelism", "outputs": [{"execution_count": 2, "output_type": "execute_result", "data": {"text/plain": "16"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "As fast derived streams have a schema they can be loaded into [Spark dataframes](https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html#pyspark.sql.DataFrame). A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a pandas Dataframe in Python. Unlike with RDDs, before any computation on a DataFrame starts, the [Catalyst optimizer](https://databricks.com/blog/2014/03/26/spark-sql-manipulating-structured-data-using-spark-2.html) compiles the operations that were used to build the DataFrame into a physical plan for execution. Since the optimizer generates JVM bytecode for execution, pyspark users will experience the same high performance as Scala users.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 4, "cell_type": "code", "source": "dataset = sqlContext.load(\"s3://telemetry-parquet/ExecutiveStream\", \"parquet\")", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Note that the above command might fail the first time it's excuted with \"Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient\". If that happens, just rerun the command and it should compelete without failure.", "cell_type": "markdown", "metadata": {}}, {"source": "In this example we are deliberately not using the partitioning scheme on S3 to speed up common filtering operations (e.g. filter by channel and/or date) but relying completely on Spark. In a real setting we would want to exploit it though by providing access to fast derived streams with the e.g. *get_records* function from the *moztelemetry* package.", "cell_type": "markdown", "metadata": {}}, {"source": "How many records are we looking at?", "cell_type": "markdown", "metadata": {}}, {"execution_count": 5, "cell_type": "code", "source": "%time dataset.count()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 4 ms, sys: 4 ms, total: 8 ms\nWall time: 23 s\n"}, {"execution_count": 5, "output_type": "execute_result", "data": {"text/plain": "1157012324L"}, "metadata": {}}], "metadata": {"scrolled": true, "collapsed": false, "trusted": true}}, {"source": "What's the schmema fo this derived stream?", "cell_type": "markdown", "metadata": {}}, {"execution_count": 6, "cell_type": "code", "source": "dataset.printSchema()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "root\n |-- docType: string (nullable = false)\n |-- submissionDate: string (nullable = false)\n |-- activityTimestamp: double (nullable = false)\n |-- profileCreationTimestamp: double (nullable = false)\n |-- clientId: string (nullable = false)\n |-- documentId: string (nullable = false)\n |-- country: string (nullable = false)\n |-- channel: string (nullable = false)\n |-- os: string (nullable = false)\n |-- osVersion: string (nullable = false)\n |-- default: boolean (nullable = false)\n |-- buildId: string (nullable = false)\n |-- app: string (nullable = false)\n |-- version: string (nullable = false)\n |-- vendor: string (nullable = false)\n |-- reason: string (nullable = false)\n |-- hours: double (nullable = false)\n |-- google: integer (nullable = false)\n |-- yahoo: integer (nullable = false)\n |-- bing: integer (nullable = false)\n |-- other: integer (nullable = false)\n |-- pluginHangs: integer (nullable = false)\n\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Note that a documentation string could be easily added within the schema iteself in the future, making the datasets effectively self-documenting.", "cell_type": "markdown", "metadata": {}}, {"source": "Let's have a quick look at the data", "cell_type": "markdown", "metadata": {}}, {"execution_count": 7, "cell_type": "code", "source": "%time dataset.show()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "docType submissionDate activityTimestamp profileCreationTimestamp clientId documentId country channel os osVersion default buildId app version vendor reason hours google yahoo bing other pluginHangs\nmain 20150927 1.443114E18 1.4431392E18 0bde6e1d-c034-4d2... 29bb48b3-1de6-4ad... ID beta Windows 6.1 true 20150910171927 Firefox 41.0 Mozilla shutdown 0.001666666666666... 0 0 0 0 0 \nmain 20150927 1.4432436E18 1.3086144E18 e4368bb6-3167-4b0... 756e3937-e2b5-450... US release Windows 5.1 false 20150917150946 Firefox 41.0 Mozilla shutdown 0.105 0 0 0 0 0 \nmain 20150927 1.4431644E18 1.4197248E18 c13608a8-19d7-4a8... 4a04741b-c226-4c4... US release Windows 10.0 false 20150826023504 Firefox 40.0.3 Mozilla shutdown 3.3183333333333334 0 0 0 4 0 \nmain 20150927 1.4432184E18 1.4347584E18 6a494f9c-6058-4cf... 3240ea6f-4622-453... DE beta Windows 6.1 true 20150907144446 Firefox 41.0 Mozilla shutdown 0.08722222222222223 0 0 0 0 0 \nmain 20150927 1.44324E18 1.4410656E18 421e2430-381e-4c8... d2498590-8adb-455... US beta Windows 10.0 false 20150921151815 Firefox 42.0 Mozilla shutdown 0.03277777777777778 0 0 0 0 0 \nmain 20150927 1.4433048E18 1.41048E18 639b4743-ec24-485... 28a649a8-330e-450... FR release Windows 6.1 true 20150917150946 Firefox 41.0 Mozilla shutdown 0.05361111111111111 0 0 0 0 0 \nmain 20150927 1.4433084E18 1.3512096E18 478e19d8-8919-40d... 2135ebcc-b1bb-45a... GB release Windows 6.1 false 20150826023504 Firefox 40.0.3 Mozilla daily 1.0 0 0 0 0 0 \nmain 20150927 1.4431968E18 1.4239584E18 478ac140-e873-415... acf4161e-7647-424... MY beta Windows 10.0 false 20150824144923 Firefox 41.0 Mozilla shutdown 2.4033333333333333 0 0 0 0 0 \nmain 20150927 1.443114E18 1.4431392E18 0bde6e1d-c034-4d2... 022ace1a-ea2f-41d... ID beta Windows 6.1 true 20150910171927 Firefox 41.0 Mozilla shutdown 0.002777777777777778 0 0 0 0 0 \nmain 20150927 1.4426172E18 1.4147136E18 cdd54607-650d-4c4... e78a298d-a3df-4ba... GB beta Windows 6.1 true 20150910171927 Firefox 41.0 Mozilla shutdown 11.322777777777778 0 0 0 0 0 \nmain 20150927 1.4431356E18 1.3656384E18 5f38d579-a93d-412... ae11b6da-fa5c-45e... GB release Windows 5.1 false 20150826023504 Firefox 40.0.3 Mozilla daily 23.99972222222222 0 0 0 0 0 \nmain 20150927 1.4432868E18 1.426464E18 85d6c7cd-241c-45f... b5b4fd28-7f3c-487... KH beta Windows 6.1 true 20150921151815 Firefox 42.0 Mozilla shutdown 0.006944444444444444 0 0 0 0 0 \nmain 20150927 1.443222E18 1.2966048E18 d6b48518-0248-458... ce37f0c6-62a4-48a... IE release Windows 5.1 true 20150826023504 Firefox 40.0.3 Mozilla daily 3.046111111111111 0 0 0 0 0 \nmain 20150927 1.4432508E18 1.4197248E18 c13608a8-19d7-4a8... 58077384-b0ac-411... US release Windows 10.0 false 20150826023504 Firefox 40.0.3 Mozilla shutdown 0.19444444444444445 0 0 0 0 0 \nmain 20150927 1.443222E18 1.3510368E18 d44d08af-9a21-4ab... 5bf068f5-c94d-485... GB release Windows 6.3 true 20150917150946 Firefox 41.0 Mozilla daily 24.0 0 0 0 0 0 \nmain 20150927 1.4432544E18 1.3239072E18 f2a07edb-762d-402... ea470cf4-5c0b-454... US release Windows 6.1 true 20150917150946 Firefox 41.0 Mozilla shutdown 0.3213888888888889 0 0 0 0 0 \nmain 20150927 1.443222E18 1.217376E18 7cf784f0-ed1d-49f... 901191fd-d975-4d7... GB release Windows 5.1 true 20150917150946 Firefox 41.0 Mozilla daily 16.766666666666666 8 0 0 0 0 \nmain 20150927 1.443222E18 1.4364864E18 98e4f9fc-b813-43c... 93cf50e7-8e9b-4c9... IE release Windows 5.1 true 20150917150946 Firefox 41.0 Mozilla daily 13.645 2 0 0 0 0 \nmain 20150927 1.4433048E18 1.41048E18 639b4743-ec24-485... 8e4eaa9c-64f1-443... FR release Windows 6.1 true 20150917150946 Firefox 41.0 Mozilla shutdown 0.16416666666666666 0 0 0 0 0 \nmain 20150927 1.443222E18 1.4219712E18 0400f102-db0a-47c... f61d10cf-2fdb-400... DO beta Windows 6.3 true 20150921151815 Firefox 42.0 Mozilla daily 6.629166666666666 2 0 0 0 0 \nCPU times: user 12 ms, sys: 4 ms, total: 16 ms\nWall time: 3.76 s\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "How many days are there?", "cell_type": "markdown", "metadata": {}}, {"execution_count": 8, "cell_type": "code", "source": "%%time \n\ndates = dataset.select(\"submissionDate\").distinct().collect()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 16 ms, sys: 4 ms, total: 20 ms\nWall time: 58.3 s\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 9, "cell_type": "code", "source": "map(lambda x: x.submissionDate, dates)", "outputs": [{"execution_count": 9, "output_type": "execute_result", "data": {"text/plain": "[u'20150930',\n u'20151001',\n u'20151002',\n u'20151003',\n u'20151004',\n u'20151005',\n u'20151006',\n u'20151007',\n u'20151008',\n u'20151009',\n u'20151010',\n u'20151011',\n u'20151012',\n u'20151013',\n u'20151014',\n u'20151015',\n u'20151016',\n u'20151017',\n u'20151018',\n u'20151019',\n u'20151020',\n u'20151021',\n u'20151022',\n u'20151023',\n u'20151024',\n u'20151025',\n u'20151026',\n u'20151027',\n u'20151028',\n u'20151029',\n u'20151030',\n u'20151031',\n u'20150927',\n u'20150928',\n u'20150929']"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 10, "cell_type": "code", "source": "len(dates)", "outputs": [{"execution_count": 10, "output_type": "execute_result", "data": {"text/plain": "35"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Now filter on release:", "cell_type": "markdown", "metadata": {}}, {"execution_count": 11, "cell_type": "code", "source": "release_dataset = dataset.filter(dataset[\"channel\"] == \"release\")", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"source": "How many records are there?", "cell_type": "markdown", "metadata": {}}, {"execution_count": 12, "cell_type": "code", "source": "%time release_dataset.count()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 8 ms, sys: 0 ns, total: 8 ms\nWall time: 36 s\n"}, {"execution_count": 12, "output_type": "execute_result", "data": {"text/plain": "683641652L"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "What's the median number of fragments per client over all days?", "cell_type": "markdown", "metadata": {}}, {"execution_count": 13, "cell_type": "code", "source": "%time np.median(release_dataset.groupBy(\"clientId\").count().select(\"count\").collect())", "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 57.2 s, sys: 2.95 s, total: 1min\nWall time: 12min 5s\n"}, {"execution_count": 13, "output_type": "execute_result", "data": {"text/plain": "30.0"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}], "nbformat": 4, "metadata": {"kernelspec": {"display_name": "Python 2", "name": "python2", "language": "python"}, "language_info": {"mimetype": "text/x-python", "nbconvert_exporter": "python", "version": "2.7.9", "name": "python", "file_extension": ".py", "pygments_lexer": "ipython2", "codemirror_mode": {"version": 2, "name": "ipython"}}}} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment