Skip to content

Instantly share code, notes, and snippets.

@cirla
Created March 24, 2016 22:56
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cirla/030a1a4e2b6075d2fc5d to your computer and use it in GitHub Desktop.
Save cirla/030a1a4e2b6075d2fc5d to your computer and use it in GitHub Desktop.
Analyzing Philadelphia 2012 General Election Results with PySpark
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# PySpark Demo\n",
"## Philly PUG 2016-03-24\n",
"\n",
"To run this notebook:\n",
"\n",
"`PYSPARK_DRIVER_PYTHON=\"jupyter\" PYSPARK_DRIVER_PYTHON_OPTS=\"notebook\" $SPARK_HOME/bin/pyspark --packages com.databricks:spark-csv_2.10:1.4.0`"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"## Analyzing Philadelphia 2012 General Election Results with PySpark\n",
"\n",
"Using http://www.analyzethevote.com/download/2012_GENERAL.txt\n",
"\n",
"Let's start by looking at what each line of the file looks like:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[u'WARD\\tDIVISION\\tTYPE\\tOFFICE\\tNAME\\tPARTY\\tVOTES',\n",
" u'01\\t01\\tA\\tATTORNEY GENERAL\\tDAVID J FREED\\tREPUBLICAN\\t2.00',\n",
" u'01\\t01\\tA\\tATTORNEY GENERAL\\tKATHLEEN G KANE\\tDEMOCRATIC\\t6.00',\n",
" u'01\\t01\\tA\\tATTORNEY GENERAL\\tMARAKAY J ROGERS\\tLIBERTARIAN\\t0.00',\n",
" u'01\\t01\\tA\\tATTORNEY GENERAL\\tWrite In\\t\\t0.00']"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"lines = sc.textFile('/home/vagrant/data/2012_GENERAL.txt')\n",
"lines.take(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Looks like TSV. Let's split up each line on tabs:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[[u'WARD', u'DIVISION', u'TYPE', u'OFFICE', u'NAME', u'PARTY', u'VOTES'],\n",
" [u'01',\n",
" u'01',\n",
" u'A',\n",
" u'ATTORNEY GENERAL',\n",
" u'DAVID J FREED',\n",
" u'REPUBLICAN',\n",
" u'2.00']]"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"electionData = lines.map(lambda line: line.split('\\t'))\n",
"electionData.take(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's ignore everything except the Presidential votes:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[[u'01',\n",
" u'01',\n",
" u'A',\n",
" u'PRESIDENT AND VICE PRESIDENT OF THE UNITED STATES',\n",
" u'BARACK OBAMA',\n",
" u'DEMOCRATIC',\n",
" u'8.00']]"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"OFFICE_COL = 3\n",
"PRES_OFFICE = \"PRESIDENT AND VICE PRESIDENT OF THE UNITED STATES\"\n",
"\n",
"presidentialVotes = electionData.filter(lambda data: data[OFFICE_COL] == PRES_OFFICE)\n",
"presidentialVotes.take(1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's map each row to a tuple of (name, votes) so we can reduce tuples with the same name down to the sum of their votes."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[(u'GARY JOHNSON', 2892.0)]"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import operator\n",
"\n",
"NAME_COL = 4\n",
"VOTES_COL = 6\n",
"\n",
"presidentialTotals = presidentialVotes.map(lambda data: (data[NAME_COL], float(data[VOTES_COL])))\\\n",
" .reduceByKey(operator.add)\n",
"presidentialTotals.take(1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, we'll sort by number of votes and collect everything back into a plain old python list:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[(u'BARACK OBAMA', 588806.0),\n",
" (u'MITT ROMNEY', 96467.0),\n",
" (u'GARY JOHNSON', 2892.0),\n",
" (u'JILL STEIN', 2162.0),\n",
" (u'Write In', 449.0)]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"presidentialTotals.sortBy(operator.itemgetter(1), ascending=False).collect()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Exploring DataFrames\n",
"\n",
"First, we create a SQL context:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<pyspark.sql.context.SQLContext at 0x7f9a36727250>"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from pyspark.sql import SQLContext\n",
"sql = SQLContext(sc)\n",
"sql"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we can use https://github.com/databricks/spark-csv to load the TSV into a Spark DataFrame:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- WARD: integer (nullable = true)\n",
" |-- DIVISION: integer (nullable = true)\n",
" |-- TYPE: string (nullable = true)\n",
" |-- OFFICE: string (nullable = true)\n",
" |-- NAME: string (nullable = true)\n",
" |-- PARTY: string (nullable = true)\n",
" |-- VOTES: double (nullable = true)\n",
"\n"
]
}
],
"source": [
"df = sql.read.format('com.databricks.spark.csv')\\\n",
" .options(header='true', inferschema='true', delimiter='\\t')\\\n",
" .load('/home/vagrant/data/2012_GENERAL.txt')\n",
"df.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Notice the automatically inferred schema!\n",
"\n",
"Dataframe has a very SQL-like API to perform exactly what we did above:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------+-----------+\n",
"| NAME|TOTAL VOTES|\n",
"+--------------+-----------+\n",
"|BARACK OBAMA| 588806.0|\n",
"| MITT ROMNEY| 96467.0|\n",
"|GARY JOHNSON| 2892.0|\n",
"| JILL STEIN| 2162.0|\n",
"| Write In| 449.0|\n",
"+--------------+-----------+\n",
"\n"
]
}
],
"source": [
"from pyspark.sql import functions\n",
"\n",
"df.filter(df['OFFICE'] == PRES_OFFICE)\\\n",
" .select('NAME', 'VOTES')\\\n",
" .groupBy('NAME')\\\n",
" .agg(functions.sum('VOTES').alias('TOTAL VOTES'))\\\n",
" .orderBy(functions.desc('TOTAL VOTES'))\\\n",
" .show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Or we can just use straight SQL:"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df.registerTempTable(\"voting_data\")"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------+-----------+\n",
"| NAME|TOTAL_VOTES|\n",
"+--------------+-----------+\n",
"|BARACK OBAMA| 588806.0|\n",
"| MITT ROMNEY| 96467.0|\n",
"|GARY JOHNSON| 2892.0|\n",
"| JILL STEIN| 2162.0|\n",
"| Write In| 449.0|\n",
"+--------------+-----------+\n",
"\n"
]
}
],
"source": [
"sql.sql(\"\"\"\n",
"SELECT\n",
" NAME,\n",
" SUM(VOTES) AS TOTAL_VOTES\n",
"FROM voting_data\n",
"WHERE OFFICE = 'PRESIDENT AND VICE PRESIDENT OF THE UNITED STATES'\n",
"GROUP BY NAME\n",
"ORDER BY TOTAL_VOTES DESC\n",
"\"\"\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And that's it for now. Thanks!"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.8"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment