Skip to content

Instantly share code, notes, and snippets.

@vitillo
Last active September 11, 2015 18:24
Show Gist options
  • Save vitillo/b61027b824b40d486dc6 to your computer and use it in GitHub Desktop.
Save vitillo/b61027b824b40d486dc6 to your computer and use it in GitHub Desktop.
Parquet test
Display the source blob
Display the rendered blob
Raw
{"nbformat_minor": 0, "cells": [{"execution_count": 20, "cell_type": "code", "source": "from pyspark.sql import SQLContext\nfrom pyspark.sql.types import *\nfrom moztelemetry import get_pings\n\n%pylab inline", "outputs": [{"output_type": "stream", "name": "stdout", "text": "Populating the interactive namespace from numpy and matplotlib\n"}, {"output_type": "stream", "name": "stderr", "text": "WARNING: pylab import has clobbered these variables: ['extract']\n`%matplotlib` prevents importing * from pylab and numpy\n"}], "metadata": {"scrolled": true, "collapsed": false, "trusted": true}}, {"source": "### Prepare dataset", "cell_type": "markdown", "metadata": {}}, {"execution_count": 2, "cell_type": "code", "source": "pings = get_pings(sc, app=\"Firefox\", channel=\"nightly\", submission_date=\"20150909\", fraction=1)", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 23, "cell_type": "code", "source": "def extract(ping):\n # Use simple fields so automatic schema inference works\n simple = ping[\"payload\"][\"simpleMeasurements\"]\n simple.pop(\"UITelemetry\", None)\n simple.pop(\"addonManager\", None)\n simple.pop(\"js\", None)\n \n info = ping[\"payload\"][\"info\"] \n info.pop(\"asyncPluginInit\", None)\n \n result = {\"histograms\": str(ping[\"payload\"][\"histograms\"])}\n \n tmp = list(info.iteritems()) + list(simple.iteritems())\n for k,v in tmp:\n if isinstance(v, (int, long)):\n result[k] = float(v)\n else:\n result[k] = v\n \n return dict(result)", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 60, "cell_type": "code", "source": "subset = pings.map(extract).coalesce(320) # ~ 45 MB per partition", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 61, "cell_type": "code", "source": "frame = sqlContext.createDataFrame(subset) # Retry if it fails with an exception...", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "### Save dataset in parquet files", "cell_type": "markdown", "metadata": {}}, {"execution_count": 62, "cell_type": "code", "source": "frame.save(\"dataset.parquet\", \"parquet\", \"overwrite\")", "outputs": [], "metadata": {"scrolled": true, "collapsed": false, "trusted": true}}, {"source": "### Save dataset in json files", "cell_type": "markdown", "metadata": {}}, {"execution_count": 63, "cell_type": "code", "source": "frame.save(\"dataset.json\", \"json\", \"overwrite\")", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "### Test json filtering speed", "cell_type": "markdown", "metadata": {}}, {"execution_count": 64, "cell_type": "code", "source": "json_file = sqlContext.load(\"dataset.json\", \"json\")", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 65, "cell_type": "code", "source": "#json_file.save(\"s3n://net-mozaws-prod-us-west-2-pipeline-analysis/rvitillo/parquet-test/dataset.json\")", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 66, "cell_type": "code", "source": "%%time\njson_file.count()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 616 ms, sys: 0 ns, total: 616 ms\nWall time: 24.8 s\n"}, {"execution_count": 66, "output_type": "execute_result", "data": {"text/plain": "148464L"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 67, "cell_type": "code", "source": "%%time\njson_file.filter(json_file.AMI_startup_end > 10000).count()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 664 ms, sys: 0 ns, total: 664 ms\nWall time: 27 s\n"}, {"execution_count": 67, "output_type": "execute_result", "data": {"text/plain": "10567L"}, "metadata": {}}], "metadata": {"scrolled": true, "collapsed": false, "trusted": true}}, {"source": "### Test parquet filtering speed", "cell_type": "markdown", "metadata": {}}, {"execution_count": 68, "cell_type": "code", "source": "parquet_file = sqlContext.load(\"dataset.parquet\", \"parquet\")", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 69, "cell_type": "code", "source": "#parquet_file.save(\"s3n://net-mozaws-prod-us-west-2-pipeline-analysis/rvitillo/parquet-test/dataset.parquet\")", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 70, "cell_type": "code", "source": "%%time \nparquet_file.count()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 40 ms, sys: 0 ns, total: 40 ms\nWall time: 2 s\n"}, {"execution_count": 70, "output_type": "execute_result", "data": {"text/plain": "148464L"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 71, "cell_type": "code", "source": "%%time\nparquet_file.filter(parquet_file.AMI_startup_end > 10000).count()", "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 40 ms, sys: 0 ns, total: 40 ms\nWall time: 1.97 s\n"}, {"execution_count": 71, "output_type": "execute_result", "data": {"text/plain": "10567L"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": null, "cell_type": "code", "source": "", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}], "nbformat": 4, "metadata": {"kernelspec": {"display_name": "Python 2", "name": "python2", "language": "python"}, "language_info": {"mimetype": "text/x-python", "nbconvert_exporter": "python", "version": "2.7.9", "name": "python", "file_extension": ".py", "pygments_lexer": "ipython2", "codemirror_mode": {"version": 2, "name": "ipython"}}}}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment