Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save JJRyan0/5637bb40fa8dae668343af5348deef11 to your computer and use it in GitHub Desktop.
Save JJRyan0/5637bb40fa8dae668343af5348deef11 to your computer and use it in GitHub Desktop.
Apache Spark in Python Running SQL Queries on Spark DataFrames.ipynb
{
"cells": [
{
"metadata": {},
"cell_type": "markdown",
"source": "__Tutorial 1 - Apache Spark in Python: Running SQL Queries on Spark DataFrames__\n\nThis notebook is designed to introduce some basic concepts and help get you familiar with using Spark in Python.\n\nIn this notebook, we will load and explore the titanic dataset. Specifically, this tutorial covers:\n\n - Loading data in memory\n - Creating SQLContext\n - Creating Spark DataFrame\n - Group data by columns\n - Operating on columns\n - Running SQL Queries from a Spark DataFrame\n\nCreated by John Ryan using IBM Data Science Workbench."
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "#First, we import some Python packages that we need:\nfrom pyspark.sql import SQLContext\nfrom pyspark.sql.types import *\nfrom pyspark.sql import Row\nfrom pyspark.sql.functions import udf\nfrom pyspark.mllib.linalg import Vectors\nfrom pyspark.ml.param import Param, Params\nfrom pyspark.mllib.regression import LabeledPoint\nfrom pyspark.mllib.stat import Statistics\nfrom pyspark.ml.feature import OneHotEncoder, StringIndexer\nfrom pyspark.mllib.linalg import Vectors\nfrom pyspark.ml.feature import VectorAssembler\nfrom IPython.display import display\nfrom ipywidgets import interact\nimport sys\nimport numpy as np\nimport pandas as pd\nimport time\nimport datetime\nimport matplotlib.pyplot as plt\nimport os.path\n%matplotlib inline",
"execution_count": 47,
"outputs": []
},
{
"metadata": {
"collapsed": true,
"trusted": false
},
"cell_type": "code",
"source": "import pandas as pd\ndata = pd.read_csv('/resources/data/titanic.csv')",
"execution_count": 48,
"outputs": []
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "#view the first four rows of the pandas dataframe \ndata.head()",
"execution_count": 49,
"outputs": [
{
"execution_count": 49,
"output_type": "execute_result",
"data": {
"text/plain": " PassengerId Survived Pclass \\\n0 1 0 3 \n1 2 1 1 \n2 3 1 3 \n3 4 1 1 \n4 5 0 3 \n\n Name Sex Age SibSp \\\n0 Braund, Mr. Owen Harris male 22.0 1 \n1 Cumings, Mrs. John Bradley (Florence Briggs Th... female 38.0 1 \n2 Heikkinen, Miss. Laina female 26.0 0 \n3 Futrelle, Mrs. Jacques Heath (Lily May Peel) female 35.0 1 \n4 Allen, Mr. William Henry male 35.0 0 \n\n Parch Ticket Fare Cabin Embarked \n0 0 A/5 21171 7.2500 NaN S \n1 0 PC 17599 71.2833 C85 C \n2 0 STON/O2. 3101282 7.9250 NaN S \n3 0 113803 53.1000 C123 S \n4 0 373450 8.0500 NaN S ",
"text/html": "<div>\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>PassengerId</th>\n <th>Survived</th>\n <th>Pclass</th>\n <th>Name</th>\n <th>Sex</th>\n <th>Age</th>\n <th>SibSp</th>\n <th>Parch</th>\n <th>Ticket</th>\n <th>Fare</th>\n <th>Cabin</th>\n <th>Embarked</th>\n </tr>\n </thead>\n <tbody>\n <tr>\n <th>0</th>\n <td>1</td>\n <td>0</td>\n <td>3</td>\n <td>Braund, Mr. Owen Harris</td>\n <td>male</td>\n <td>22.0</td>\n <td>1</td>\n <td>0</td>\n <td>A/5 21171</td>\n <td>7.2500</td>\n <td>NaN</td>\n <td>S</td>\n </tr>\n <tr>\n <th>1</th>\n <td>2</td>\n <td>1</td>\n <td>1</td>\n <td>Cumings, Mrs. John Bradley (Florence Briggs Th...</td>\n <td>female</td>\n <td>38.0</td>\n <td>1</td>\n <td>0</td>\n <td>PC 17599</td>\n <td>71.2833</td>\n <td>C85</td>\n <td>C</td>\n </tr>\n <tr>\n <th>2</th>\n <td>3</td>\n <td>1</td>\n <td>3</td>\n <td>Heikkinen, Miss. Laina</td>\n <td>female</td>\n <td>26.0</td>\n <td>0</td>\n <td>0</td>\n <td>STON/O2. 3101282</td>\n <td>7.9250</td>\n <td>NaN</td>\n <td>S</td>\n </tr>\n <tr>\n <th>3</th>\n <td>4</td>\n <td>1</td>\n <td>1</td>\n <td>Futrelle, Mrs. Jacques Heath (Lily May Peel)</td>\n <td>female</td>\n <td>35.0</td>\n <td>1</td>\n <td>0</td>\n <td>113803</td>\n <td>53.1000</td>\n <td>C123</td>\n <td>S</td>\n </tr>\n <tr>\n <th>4</th>\n <td>5</td>\n <td>0</td>\n <td>3</td>\n <td>Allen, Mr. William Henry</td>\n <td>male</td>\n <td>35.0</td>\n <td>0</td>\n <td>0</td>\n <td>373450</td>\n <td>8.0500</td>\n <td>NaN</td>\n <td>S</td>\n </tr>\n </tbody>\n</table>\n</div>"
},
"metadata": {}
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "**Missing Value Inputation**\n\nCounting and filling the missing values for each column."
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "#Data Munging\n#Checking for missing values in the data\ndata.apply(lambda x: sum(x.isnull()),axis=0)",
"execution_count": 68,
"outputs": [
{
"execution_count": 68,
"output_type": "execute_result",
"data": {
"text/plain": "PassengerId 0\nSurvived 0\nPclass 0\nName 0\nSex 0\nAge 177\nSibSp 0\nParch 0\nTicket 0\nFare 0\nCabin 687\nEmbarked 2\ndtype: int64"
},
"metadata": {}
}
]
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "#Display of the total Education value counts\ndata['Cabin'].value_counts()",
"execution_count": 72,
"outputs": [
{
"execution_count": 72,
"output_type": "execute_result",
"data": {
"text/plain": "C23 C25 C27 4\nG6 4\nB96 B98 4\nD 3\nC22 C26 3\nE101 3\nF2 3\nF33 3\nB57 B59 B63 B66 2\nC68 2\nB58 B60 2\nE121 2\nD20 2\nE8 2\nE44 2\nB77 2\nC65 2\nD26 2\nE24 2\nE25 2\nB20 2\nC93 2\nD33 2\nE67 2\nD35 2\nD36 2\nC52 2\nF4 2\nC125 2\nC124 2\n ..\nF G63 1\nA6 1\nD45 1\nD6 1\nD56 1\nC101 1\nC54 1\nD28 1\nD37 1\nB102 1\nD30 1\nE17 1\nE58 1\nF E69 1\nD10 D12 1\nE50 1\nA14 1\nC91 1\nA16 1\nB38 1\nB39 1\nC95 1\nB78 1\nB79 1\nC99 1\nB37 1\nA19 1\nE12 1\nA7 1\nD15 1\nName: Cabin, dtype: int64"
},
"metadata": {}
}
]
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "#Fill in the NaN values with the most common cabin type in the data\ndata['Cabin'].fillna('G6',inplace=True)",
"execution_count": 74,
"outputs": []
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "#Display of the total Embarked value counts\ndata['Embarked'].value_counts()",
"execution_count": 75,
"outputs": [
{
"execution_count": 75,
"output_type": "execute_result",
"data": {
"text/plain": "S 644\nC 168\nQ 77\nName: Embarked, dtype: int64"
},
"metadata": {}
}
]
},
{
"metadata": {
"collapsed": true,
"trusted": false
},
"cell_type": "code",
"source": "#Fill in the NaN values with the most common Embarked type in the data\ndata['Embarked'].fillna('s',inplace=True)",
"execution_count": 76,
"outputs": []
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "#Display of the total Age value counts\ndata['Age'].value_counts()",
"execution_count": 78,
"outputs": [
{
"execution_count": 78,
"output_type": "execute_result",
"data": {
"text/plain": "24.00 30\n22.00 27\n18.00 26\n30.00 25\n28.00 25\n19.00 25\n21.00 24\n25.00 23\n36.00 22\n29.00 20\n32.00 18\n27.00 18\n35.00 18\n26.00 18\n16.00 17\n31.00 17\n23.00 15\n34.00 15\n33.00 15\n20.00 15\n39.00 14\n17.00 13\n40.00 13\n42.00 13\n45.00 12\n38.00 11\n50.00 10\n2.00 10\n4.00 10\n47.00 9\n ..\n28.50 2\n40.50 2\n63.00 2\n13.00 2\n10.00 2\n45.50 2\n70.00 2\n30.50 2\n71.00 2\n59.00 2\n57.00 2\n55.00 2\n0.75 2\n64.00 2\n23.50 1\n14.50 1\n0.67 1\n53.00 1\n0.92 1\n0.42 1\n70.50 1\n36.50 1\n80.00 1\n66.00 1\n74.00 1\n12.00 1\n55.50 1\n34.50 1\n24.50 1\n20.50 1\nName: Age, dtype: int64"
},
"metadata": {}
}
]
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "data['Age'].fillna(24.00)",
"execution_count": 79,
"outputs": [
{
"execution_count": 79,
"output_type": "execute_result",
"data": {
"text/plain": "0 22.0\n1 38.0\n2 26.0\n3 35.0\n4 35.0\n5 24.0\n6 54.0\n7 2.0\n8 27.0\n9 14.0\n10 4.0\n11 58.0\n12 20.0\n13 39.0\n14 14.0\n15 55.0\n16 2.0\n17 24.0\n18 31.0\n19 24.0\n20 35.0\n21 34.0\n22 15.0\n23 28.0\n24 8.0\n25 38.0\n26 24.0\n27 19.0\n28 24.0\n29 24.0\n ... \n861 21.0\n862 48.0\n863 24.0\n864 24.0\n865 42.0\n866 27.0\n867 31.0\n868 24.0\n869 4.0\n870 26.0\n871 47.0\n872 33.0\n873 47.0\n874 28.0\n875 15.0\n876 20.0\n877 19.0\n878 24.0\n879 56.0\n880 25.0\n881 33.0\n882 22.0\n883 28.0\n884 25.0\n885 39.0\n886 27.0\n887 19.0\n888 24.0\n889 26.0\n890 32.0\nName: Age, dtype: float64"
},
"metadata": {}
}
]
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "data = data.fillna(data.mean())",
"execution_count": 84,
"outputs": []
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "#Checking for missing values in the data\ndata.apply(lambda x: sum(x.isnull()),axis=0)",
"execution_count": 85,
"outputs": [
{
"execution_count": 85,
"output_type": "execute_result",
"data": {
"text/plain": "PassengerId 0\nSurvived 0\nPclass 0\nName 0\nSex 0\nAge 0\nSibSp 0\nParch 0\nTicket 0\nFare 0\nCabin 0\nEmbarked 0\ndtype: int64"
},
"metadata": {}
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": " __Create a Spark DataFrame__\n \n In order to work with Spark Data Frames we need to initialize SQLContext"
},
{
"metadata": {
"collapsed": true,
"trusted": false
},
"cell_type": "code",
"source": "# the spark SQL Context is created using SQLContext(sc)\nsqlContext = SQLContext(sc)",
"execution_count": 86,
"outputs": []
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "#With the combination of the SQLContext and the pandas dataframe we can now create the \n#spark dataframe\nsdata = sqlContext.createDataFrame(data)\nsdata.printSchema()#prints out the data schema for the new data frame",
"execution_count": 87,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": "root\n |-- PassengerId: long (nullable = true)\n |-- Survived: long (nullable = true)\n |-- Pclass: long (nullable = true)\n |-- Name: string (nullable = true)\n |-- Sex: string (nullable = true)\n |-- Age: double (nullable = true)\n |-- SibSp: long (nullable = true)\n |-- Parch: long (nullable = true)\n |-- Ticket: string (nullable = true)\n |-- Fare: double (nullable = true)\n |-- Cabin: string (nullable = true)\n |-- Embarked: string (nullable = true)\n\n"
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "__Selecting Data Columns__"
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "#Taking a look at some of the columns\nsdata.select('Fare','Ticket', 'Survived').show()",
"execution_count": 88,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": "+-------+----------------+--------+\n| Fare| Ticket|Survived|\n+-------+----------------+--------+\n| 7.25| A/5 21171| 0|\n|71.2833| PC 17599| 1|\n| 7.925|STON/O2. 3101282| 1|\n| 53.1| 113803| 1|\n| 8.05| 373450| 0|\n| 8.4583| 330877| 0|\n|51.8625| 17463| 0|\n| 21.075| 349909| 0|\n|11.1333| 347742| 1|\n|30.0708| 237736| 1|\n| 16.7| PP 9549| 1|\n| 26.55| 113783| 1|\n| 8.05| A/5. 2151| 0|\n| 31.275| 347082| 0|\n| 7.8542| 350406| 0|\n| 16.0| 248706| 1|\n| 29.125| 382652| 0|\n| 13.0| 244373| 1|\n| 18.0| 345763| 0|\n| 7.225| 2649| 1|\n+-------+----------------+--------+\nonly showing top 20 rows\n\n"
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "__Grouping by and Aggregation__\n\nWhen using the sql group by command we can dig further into the data and group all titanic survivers and non survivers by the average fair paid. "
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "#Grouping by survivor column and aggregating the fare column to return the average fare.\nsdata.groupby(['Survived'])\\\n.agg({\"Fare\": \"AVG\"})\\\n.show(5)",
"execution_count": 89,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": "+--------+-----------------+\n|Survived| avg(Fare)|\n+--------+-----------------+\n| 0| 22.1178868852459|\n| 1|48.39540760233918|\n+--------+-----------------+\n\n"
}
]
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "#using sql group by command we can dig further into the data to aggregate and sort. \nsdata.groupby(['Survived'])\\\n.agg({\"Fare\": \"count\"})\\\n.sort(\"count(Fare)\", ascending=True)\\\n.show(5)",
"execution_count": 90,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": "+--------+-----------+\n|Survived|count(Fare)|\n+--------+-----------+\n| 1| 342|\n| 0| 549|\n+--------+-----------+\n\n"
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "__SQL Queries using Spark__"
},
{
"metadata": {
"collapsed": true,
"trusted": false
},
"cell_type": "code",
"source": "#Step 1 convert spark dataframe to a table\nsdata.registerTempTable(\"titanic\")",
"execution_count": 91,
"outputs": []
},
{
"metadata": {
"collapsed": false,
"trusted": false
},
"cell_type": "code",
"source": "#use the SQL method part of SQLcontext to query the table\nquery1 = sqlContext.sql(\"SELECT Ticket FROM titanic WHERE Fare < 1000\")\nquery1.show()",
"execution_count": 92,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": "+----------------+\n| Ticket|\n+----------------+\n| A/5 21171|\n| PC 17599|\n|STON/O2. 3101282|\n| 113803|\n| 373450|\n| 330877|\n| 17463|\n| 349909|\n| 347742|\n| 237736|\n| PP 9549|\n| 113783|\n| A/5. 2151|\n| 347082|\n| 350406|\n| 248706|\n| 382652|\n| 244373|\n| 345763|\n| 2649|\n+----------------+\nonly showing top 20 rows\n\n"
}
]
}
],
"metadata": {
"kernelspec": {
"name": "python2",
"display_name": "Python 2",
"language": "python"
},
"widgets": {
"state": {},
"version": "1.1.2"
},
"language_info": {
"mimetype": "text/x-python",
"nbconvert_exporter": "python",
"name": "python",
"pygments_lexer": "ipython2",
"version": "2.7.11",
"file_extension": ".py",
"codemirror_mode": {
"version": 2,
"name": "ipython"
}
},
"gist": {
"id": "",
"data": {
"description": "Apache Spark in Python Running SQL Queries on Spark DataFrames.ipynb",
"public": true
}
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment