Skip to content

Instantly share code, notes, and snippets.

@vitillo
Created November 1, 2015 22:27
Show Gist options
  • Save vitillo/f9fb53492adf72d88aa2 to your computer and use it in GitHub Desktop.
Save vitillo/f9fb53492adf72d88aa2 to your computer and use it in GitHub Desktop.
Fast Derived Streams
Display the source blob
Display the rendered blob
Raw
{"nbformat_minor": 0, "cells": [{"source": "### Fast Derived Streams", "cell_type": "markdown", "metadata": {}}, {"source": "This is a short example of the capabilities provided by derived streams stored in an analysis friendly format.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 1, "cell_type": "code", "source": "import numpy as np\nfrom pyspark.sql import SQLContext\nfrom pyspark.sql.types import *\n\n%pylab inline", "outputs": [{"output_type": "stream", "name": "stdout", "text": "Populating the interactive namespace from numpy and matplotlib\n"}], "metadata": {"scrolled": false, "collapsed": false, "trusted": true}}, {"source": "Note that we are using a cluster with a single worker:", "cell_type": "markdown", "metadata": {}}, {"execution_count": 2, "cell_type": "code", "source": "sc.defaultParallelism", "outputs": [{"execution_count": 2, "output_type": "execute_result", "data": {"text/plain": "16"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "As fast derived streams have a schema they can be loaded into [Spark dataframes](https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html#pyspark.sql.DataFrame). A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a pandas Dataframe in Python. Unlike with RDDs, before any computation on a DataFrame starts, the [Catalyst optimizer](https://databricks.com/blog/2014/03/26/spark-sql-manipulating-structured-data-using-spark-2.html) compiles the operations that were used to build the DataFrame into a physical plan for execution. Since the optimizer generates JVM bytecode for execution, pyspark users will experience the same high performance as Scala users.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 4, "cell_type": "code", "source": "dataset = sqlContext.load(\"s3://telemetry-parquet/ExecutiveStream\", \"parquet\")", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Note that the above command might fail the first time it's excuted with \"Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient\". If that happens, just rerun the command and it should compelete without failure.", "cell_type": "markdown", "metadata": {}}, {"source": "In this example we are deliberately not using the partitioning scheme on S3 to speed up common filtering operations (e.g. filter by channel and/or date) but relying completely on Spark. In a real setting we would want to exploit it though by providing access to fast derived streams with the e.g. *get_records* function from the *moztelemetry* package.", "cell_type": "markdown", "metadata": {}}, {"source": "How many records are we looking at?", "cell_type": "markdown", "metadata": {}}, {"execution_count": 5, "cell_type": "code", "source": "%time dataset.count()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 4 ms, sys: 4 ms, total: 8 ms\nWall time: 23 s\n"}, {"execution_count": 5, "output_type": "execute_result", "data": {"text/plain": "1157012324L"}, "metadata": {}}], "metadata": {"scrolled": true, "collapsed": false, "trusted": true}}, {"source": "What's the schmema fo this derived stream?", "cell_type": "markdown", "metadata": {}}, {"execution_count": 6, "cell_type": "code", "source": "dataset.printSchema()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "root\n |-- docType: string (nullable = false)\n |-- submissionDate: string (nullable = false)\n |-- activityTimestamp: double (nullable = false)\n |-- profileCreationTimestamp: double (nullable = false)\n |-- clientId: string (nullable = false)\n |-- documentId: string (nullable = false)\n |-- country: string (nullable = false)\n |-- channel: string (nullable = false)\n |-- os: string (nullable = false)\n |-- osVersion: string (nullable = false)\n |-- default: boolean (nullable = false)\n |-- buildId: string (nullable = false)\n |-- app: string (nullable = false)\n |-- version: string (nullable = false)\n |-- vendor: string (nullable = false)\n |-- reason: string (nullable = false)\n |-- hours: double (nullable = false)\n |-- google: integer (nullable = false)\n |-- yahoo: integer (nullable = false)\n |-- bing: integer (nullable = false)\n |-- other: integer (nullable = false)\n |-- pluginHangs: integer (nullable = false)\n\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Note that a documentation string could be easily added within the schema iteself in the future, making the datasets effectively self-documenting.", "cell_type": "markdown", "metadata": {}}, {"source": "Let's have a quick look at the data", "cell_type": "markdown", "metadata": {}}, {"execution_count": 7, "cell_type": "code", "source": "%time dataset.show()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "docType submissionDate activityTimestamp profileCreationTimestamp clientId documentId country channel os osVersion default buildId app version vendor reason hours google yahoo bing other pluginHangs\nmain 20150927 1.443114E18 1.4431392E18 0bde6e1d-c034-4d2... 29bb48b3-1de6-4ad... ID beta Windows 6.1 true 20150910171927 Firefox 41.0 Mozilla shutdown 0.001666666666666... 0 0 0 0 0 \nmain 20150927 1.4432436E18 1.3086144E18 e4368bb6-3167-4b0... 756e3937-e2b5-450... US release Windows 5.1 false 20150917150946 Firefox 41.0 Mozilla shutdown 0.105 0 0 0 0 0 \nmain 20150927 1.4431644E18 1.4197248E18 c13608a8-19d7-4a8... 4a04741b-c226-4c4... US release Windows 10.0 false 20150826023504 Firefox 40.0.3 Mozilla shutdown 3.3183333333333334 0 0 0 4 0 \nmain 20150927 1.4432184E18 1.4347584E18 6a494f9c-6058-4cf... 3240ea6f-4622-453... DE beta Windows 6.1 true 20150907144446 Firefox 41.0 Mozilla shutdown 0.08722222222222223 0 0 0 0 0 \nmain 20150927 1.44324E18 1.4410656E18 421e2430-381e-4c8... d2498590-8adb-455... US beta Windows 10.0 false 20150921151815 Firefox 42.0 Mozilla shutdown 0.03277777777777778 0 0 0 0 0 \nmain 20150927 1.4433048E18 1.41048E18 639b4743-ec24-485... 28a649a8-330e-450... FR release Windows 6.1 true 20150917150946 Firefox 41.0 Mozilla shutdown 0.05361111111111111 0 0 0 0 0 \nmain 20150927 1.4433084E18 1.3512096E18 478e19d8-8919-40d... 2135ebcc-b1bb-45a... GB release Windows 6.1 false 20150826023504 Firefox 40.0.3 Mozilla daily 1.0 0 0 0 0 0 \nmain 20150927 1.4431968E18 1.4239584E18 478ac140-e873-415... acf4161e-7647-424... MY beta Windows 10.0 false 20150824144923 Firefox 41.0 Mozilla shutdown 2.4033333333333333 0 0 0 0 0 \nmain 20150927 1.443114E18 1.4431392E18 0bde6e1d-c034-4d2... 022ace1a-ea2f-41d... ID beta Windows 6.1 true 20150910171927 Firefox 41.0 Mozilla shutdown 0.002777777777777778 0 0 0 0 0 \nmain 20150927 1.4426172E18 1.4147136E18 cdd54607-650d-4c4... e78a298d-a3df-4ba... GB beta Windows 6.1 true 20150910171927 Firefox 41.0 Mozilla shutdown 11.322777777777778 0 0 0 0 0 \nmain 20150927 1.4431356E18 1.3656384E18 5f38d579-a93d-412... ae11b6da-fa5c-45e... GB release Windows 5.1 false 20150826023504 Firefox 40.0.3 Mozilla daily 23.99972222222222 0 0 0 0 0 \nmain 20150927 1.4432868E18 1.426464E18 85d6c7cd-241c-45f... b5b4fd28-7f3c-487... KH beta Windows 6.1 true 20150921151815 Firefox 42.0 Mozilla shutdown 0.006944444444444444 0 0 0 0 0 \nmain 20150927 1.443222E18 1.2966048E18 d6b48518-0248-458... ce37f0c6-62a4-48a... IE release Windows 5.1 true 20150826023504 Firefox 40.0.3 Mozilla daily 3.046111111111111 0 0 0 0 0 \nmain 20150927 1.4432508E18 1.4197248E18 c13608a8-19d7-4a8... 58077384-b0ac-411... US release Windows 10.0 false 20150826023504 Firefox 40.0.3 Mozilla shutdown 0.19444444444444445 0 0 0 0 0 \nmain 20150927 1.443222E18 1.3510368E18 d44d08af-9a21-4ab... 5bf068f5-c94d-485... GB release Windows 6.3 true 20150917150946 Firefox 41.0 Mozilla daily 24.0 0 0 0 0 0 \nmain 20150927 1.4432544E18 1.3239072E18 f2a07edb-762d-402... ea470cf4-5c0b-454... US release Windows 6.1 true 20150917150946 Firefox 41.0 Mozilla shutdown 0.3213888888888889 0 0 0 0 0 \nmain 20150927 1.443222E18 1.217376E18 7cf784f0-ed1d-49f... 901191fd-d975-4d7... GB release Windows 5.1 true 20150917150946 Firefox 41.0 Mozilla daily 16.766666666666666 8 0 0 0 0 \nmain 20150927 1.443222E18 1.4364864E18 98e4f9fc-b813-43c... 93cf50e7-8e9b-4c9... IE release Windows 5.1 true 20150917150946 Firefox 41.0 Mozilla daily 13.645 2 0 0 0 0 \nmain 20150927 1.4433048E18 1.41048E18 639b4743-ec24-485... 8e4eaa9c-64f1-443... FR release Windows 6.1 true 20150917150946 Firefox 41.0 Mozilla shutdown 0.16416666666666666 0 0 0 0 0 \nmain 20150927 1.443222E18 1.4219712E18 0400f102-db0a-47c... f61d10cf-2fdb-400... DO beta Windows 6.3 true 20150921151815 Firefox 42.0 Mozilla daily 6.629166666666666 2 0 0 0 0 \nCPU times: user 12 ms, sys: 4 ms, total: 16 ms\nWall time: 3.76 s\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "How many days are there?", "cell_type": "markdown", "metadata": {}}, {"execution_count": 8, "cell_type": "code", "source": "%%time \n\ndates = dataset.select(\"submissionDate\").distinct().collect()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 16 ms, sys: 4 ms, total: 20 ms\nWall time: 58.3 s\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 9, "cell_type": "code", "source": "map(lambda x: x.submissionDate, dates)", "outputs": [{"execution_count": 9, "output_type": "execute_result", "data": {"text/plain": "[u'20150930',\n u'20151001',\n u'20151002',\n u'20151003',\n u'20151004',\n u'20151005',\n u'20151006',\n u'20151007',\n u'20151008',\n u'20151009',\n u'20151010',\n u'20151011',\n u'20151012',\n u'20151013',\n u'20151014',\n u'20151015',\n u'20151016',\n u'20151017',\n u'20151018',\n u'20151019',\n u'20151020',\n u'20151021',\n u'20151022',\n u'20151023',\n u'20151024',\n u'20151025',\n u'20151026',\n u'20151027',\n u'20151028',\n u'20151029',\n u'20151030',\n u'20151031',\n u'20150927',\n u'20150928',\n u'20150929']"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 10, "cell_type": "code", "source": "len(dates)", "outputs": [{"execution_count": 10, "output_type": "execute_result", "data": {"text/plain": "35"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "Now filter on release:", "cell_type": "markdown", "metadata": {}}, {"execution_count": 11, "cell_type": "code", "source": "release_dataset = dataset.filter(dataset[\"channel\"] == \"release\")", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"source": "How many records are there?", "cell_type": "markdown", "metadata": {}}, {"execution_count": 12, "cell_type": "code", "source": "%time release_dataset.count()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 8 ms, sys: 0 ns, total: 8 ms\nWall time: 36 s\n"}, {"execution_count": 12, "output_type": "execute_result", "data": {"text/plain": "683641652L"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"source": "What's the median number of fragments per client over all days?", "cell_type": "markdown", "metadata": {}}, {"execution_count": 13, "cell_type": "code", "source": "%time np.median(release_dataset.groupBy(\"clientId\").count().select(\"count\").collect())", "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 57.2 s, sys: 2.95 s, total: 1min\nWall time: 12min 5s\n"}, {"execution_count": 13, "output_type": "execute_result", "data": {"text/plain": "30.0"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}], "nbformat": 4, "metadata": {"kernelspec": {"display_name": "Python 2", "name": "python2", "language": "python"}, "language_info": {"mimetype": "text/x-python", "nbconvert_exporter": "python", "version": "2.7.9", "name": "python", "file_extension": ".py", "pygments_lexer": "ipython2", "codemirror_mode": {"version": 2, "name": "ipython"}}}}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment