Skip to content

Instantly share code, notes, and snippets.

@GraceLoggins
Created May 8, 2022 00:16
Show Gist options
  • Save GraceLoggins/a69f61814f3ef75164bedfac7855119f to your computer and use it in GitHub Desktop.
Save GraceLoggins/a69f61814f3ef75164bedfac7855119f to your computer and use it in GitHub Desktop.
Coursera - Big Data, Spark SQL
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<center>\n",
" <img src=\"https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/images/IDSNlogo.png\" width=\"300\" alt=\"cognitiveclass.ai logo\" />\n",
"</center>\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# **Introduction to SparkSQL**\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Estimated time needed: **15** minutes\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This lab goes over the basic operations of Apache SparkSQL.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![](http://spark.apache.org/images/spark-logo.png)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Objectives\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Spark SQL is a Spark module for structured data processing. It is sed to query structured data inside Spark programs, using either SQL or a familiar DataFrame API.\n",
"\n",
"After completing this lab you will be able to:\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"* Load a data file into a dataframe\n",
"* Create a Table View for the dataframe\n",
"* Run basic SQL queries and aggregate data on the table view\n",
"* Create a Pandas UDF to perform columnar operations\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"***\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For this lab, we are going to be using Python and Spark (PySpark). These libraries should be installed in your lab environment or in SN Labs. Pandas is a popular data science package for Python. In this lab, we use Pandas to load a CSV file from disc to a pandas dataframe in memory. PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context.\n"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Collecting pyspark\n",
" Downloading pyspark-3.2.0.tar.gz (281.3 MB)\n",
" |████████████████████████████████| 281.3 MB 14 kB/s | 21.3 MB 31.4 MB/s eta 0:00:09\n",
"\u001b[?25h Preparing metadata (setup.py) ... \u001b[?25ldone\n",
"\u001b[?25hCollecting py4j==0.10.9.2\n",
" Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)\n",
" |████████████████████████████████| 198 kB 67.8 MB/s \n",
"\u001b[?25hBuilding wheels for collected packages: pyspark\n",
" Building wheel for pyspark (setup.py) ... \u001b[?25ldone\n",
"\u001b[?25h Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=cb3632fcea1bf18aada3a7ccedb2c4533d69366ec368b42deb4c974e2e82822c\n",
" Stored in directory: /home/jupyterlab/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718\n",
"Successfully built pyspark\n",
"Installing collected packages: py4j, pyspark\n",
"Successfully installed py4j-0.10.9.2 pyspark-3.2.0\n",
"Collecting findspark\n",
" Downloading findspark-2.0.0-py2.py3-none-any.whl (4.4 kB)\n",
"Installing collected packages: findspark\n",
"Successfully installed findspark-2.0.0\n",
"Collecting pyarrow==1.0.0\n",
" Downloading pyarrow-1.0.0-cp37-cp37m-manylinux2014_x86_64.whl (17.2 MB)\n",
" |████████████████████████████████| 17.2 MB 36.2 MB/s \n",
"\u001b[?25hRequirement already satisfied: numpy>=1.14 in /home/jupyterlab/conda/envs/python/lib/python3.7/site-packages (from pyarrow==1.0.0) (1.21.4)\n",
"Installing collected packages: pyarrow\n",
"Successfully installed pyarrow-1.0.0\n",
"Requirement already satisfied: pandas in /home/jupyterlab/conda/envs/python/lib/python3.7/site-packages (1.3.4)\n",
"Requirement already satisfied: python-dateutil>=2.7.3 in /home/jupyterlab/conda/envs/python/lib/python3.7/site-packages (from pandas) (2.8.2)\n",
"Requirement already satisfied: pytz>=2017.3 in /home/jupyterlab/conda/envs/python/lib/python3.7/site-packages (from pandas) (2021.3)\n",
"Requirement already satisfied: numpy>=1.17.3 in /home/jupyterlab/conda/envs/python/lib/python3.7/site-packages (from pandas) (1.21.4)\n",
"Requirement already satisfied: six>=1.5 in /home/jupyterlab/conda/envs/python/lib/python3.7/site-packages (from python-dateutil>=2.7.3->pandas) (1.16.0)\n",
"Collecting numpy==1.19.5\n",
" Downloading numpy-1.19.5-cp37-cp37m-manylinux2010_x86_64.whl (14.8 MB)\n",
" |████████████████████████████████| 14.8 MB 25.7 MB/s \n",
"\u001b[?25hInstalling collected packages: numpy\n",
" Attempting uninstall: numpy\n",
" Found existing installation: numpy 1.21.4\n",
" Uninstalling numpy-1.21.4:\n",
" Successfully uninstalled numpy-1.21.4\n",
"Successfully installed numpy-1.19.5\n"
]
}
],
"source": [
"# Installing required packages\n",
"!pip install pyspark\n",
"!pip install findspark\n",
"!pip install pyarrow==1.0.0\n",
"!pip install pandas\n",
"!pip install numpy==1.19.5"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import findspark\n",
"findspark.init()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"from pyspark import SparkContext, SparkConf\n",
"from pyspark.sql import SparkSession"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Exercise 1 - Spark session\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create and initialize the Spark session needed to load the data frames and operate on it\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Task 1: Creating the spark session and context\n"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"SLF4J: Class path contains multiple SLF4J bindings.\n",
"SLF4J: Found binding in [jar:file:/home/jupyterlab/conda/envs/python/lib/python3.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]\n",
"SLF4J: Found binding in [jar:file:/home/jupyterlab/hadoop-2.9.2/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]\n",
"SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.\n",
"SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]\n",
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
"22/01/25 00:51:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
]
}
],
"source": [
"# Creating a spark context class\n",
"sc = SparkContext()\n",
"\n",
"# Creating a spark session\n",
"spark = SparkSession \\\n",
" .builder \\\n",
" .appName(\"Python Spark DataFrames basic example\") \\\n",
" .config(\"spark.some.config.option\", \"some-value\") \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Task 2: Initialize Spark session\n",
"\n",
"To work with dataframes we just need to verify that the spark session instance has been created.\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
" <div>\n",
" <p><b>SparkSession - in-memory</b></p>\n",
" \n",
" <div>\n",
" <p><b>SparkContext</b></p>\n",
"\n",
" <p><a href=\"http://jupyterlab-grace123:4040\">Spark UI</a></p>\n",
"\n",
" <dl>\n",
" <dt>Version</dt>\n",
" <dd><code>v3.2.0</code></dd>\n",
" <dt>Master</dt>\n",
" <dd><code>local[*]</code></dd>\n",
" <dt>AppName</dt>\n",
" <dd><code>pyspark-shell</code></dd>\n",
" </dl>\n",
" </div>\n",
" \n",
" </div>\n",
" "
],
"text/plain": [
"<pyspark.sql.session.SparkSession at 0x7f065b866110>"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"spark"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Exercise 2 - Loading the Data and creating a table view\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this section, you will first read the CSV file into a Pandas Dataframe and then read it into a Spark Dataframe\n",
"Pandas is a library used for data manipulation and analysis. The Pandas library offers data structures and operations for creating and manipulating Data Series and DataFrame objects. Data can be imported from various data sources, e.g., Numpy arrays, Python dictionaries, and CSV files. Pandas allows you to manipulate, organize and display the data.\n",
"\n",
"To create a Spark DataFrame we load an external DataFrame, called `mtcars`. This DataFrame includes 32 observations on 11 variables:\n",
"\n",
"| colIndex | colName | units/description |\n",
"| :------: | :------ | :--------------------------------------- |\n",
"| [, 1] | mpg | Miles per gallon |\n",
"| [, 2] | cyl | Number of cylinders |\n",
"| [, 3] | disp | Displacement (cu.in.) |\n",
"| [, 4] | hp | Gross horsepower |\n",
"| [, 5] | drat | Rear axle ratio |\n",
"| [, 6] | wt | Weight (lb/1000) |\n",
"| [, 7] | qsec | 1/4 mile time |\n",
"| [, 8] | vs | V/S |\n",
"| [, 9] | am | Transmission (0 = automatic, 1 = manual) |\n",
"| [,10] | gear | Number of forward gears |\n",
"| [,11] | carb | Number of carburetors |\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Task 1: Load data into a Pandas DataFrame.\n",
"\n",
"Pandas has a convenient function to load CSV data from a URL directly into a pandas dataframe.\n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"# Read the file using `read_csv` function in pandas\n",
"mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Unnamed: 0</th>\n",
" <th>mpg</th>\n",
" <th>cyl</th>\n",
" <th>disp</th>\n",
" <th>hp</th>\n",
" <th>drat</th>\n",
" <th>wt</th>\n",
" <th>qsec</th>\n",
" <th>vs</th>\n",
" <th>am</th>\n",
" <th>gear</th>\n",
" <th>carb</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>Mazda RX4</td>\n",
" <td>21.0</td>\n",
" <td>6</td>\n",
" <td>160.0</td>\n",
" <td>110</td>\n",
" <td>3.90</td>\n",
" <td>2.620</td>\n",
" <td>16.46</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>4</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>Mazda RX4 Wag</td>\n",
" <td>21.0</td>\n",
" <td>6</td>\n",
" <td>160.0</td>\n",
" <td>110</td>\n",
" <td>3.90</td>\n",
" <td>2.875</td>\n",
" <td>17.02</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>4</td>\n",
" <td>4</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>Datsun 710</td>\n",
" <td>22.8</td>\n",
" <td>4</td>\n",
" <td>108.0</td>\n",
" <td>93</td>\n",
" <td>3.85</td>\n",
" <td>2.320</td>\n",
" <td>18.61</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>4</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>Hornet 4 Drive</td>\n",
" <td>21.4</td>\n",
" <td>6</td>\n",
" <td>258.0</td>\n",
" <td>110</td>\n",
" <td>3.08</td>\n",
" <td>3.215</td>\n",
" <td>19.44</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>3</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>Hornet Sportabout</td>\n",
" <td>18.7</td>\n",
" <td>8</td>\n",
" <td>360.0</td>\n",
" <td>175</td>\n",
" <td>3.15</td>\n",
" <td>3.440</td>\n",
" <td>17.02</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>3</td>\n",
" <td>2</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Unnamed: 0 mpg cyl disp hp drat wt qsec vs am gear \\\n",
"0 Mazda RX4 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 \n",
"1 Mazda RX4 Wag 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 \n",
"2 Datsun 710 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 \n",
"3 Hornet 4 Drive 21.4 6 258.0 110 3.08 3.215 19.44 1 0 3 \n",
"4 Hornet Sportabout 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 \n",
"\n",
" carb \n",
"0 4 \n",
"1 4 \n",
"2 1 \n",
"3 1 \n",
"4 2 "
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Preview a few records\n",
"mtcars.head()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"mtcars.rename( columns={'Unnamed: 0':'name'}, inplace=True )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Task 2: Loading data into a Spark DataFrame\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We use the `createDataFrame` function to load the data into a spark dataframe\n"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"sdf = spark.createDataFrame(mtcars) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let us look at the schema of the loaded spark dataframe\n"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- name: string (nullable = true)\n",
" |-- mpg: double (nullable = true)\n",
" |-- cyl: long (nullable = true)\n",
" |-- disp: double (nullable = true)\n",
" |-- hp: long (nullable = true)\n",
" |-- drat: double (nullable = true)\n",
" |-- wt: double (nullable = true)\n",
" |-- qsec: double (nullable = true)\n",
" |-- vs: long (nullable = true)\n",
" |-- am: long (nullable = true)\n",
" |-- gear: long (nullable = true)\n",
" |-- carb: long (nullable = true)\n",
"\n"
]
}
],
"source": [
"sdf.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Task 3: Create a Table View\n",
"\n",
"Creating a table view in Spark SQL is required to run SQL queries programmatically on a DataFrame. A view is a temporary table to run SQL queries. A Temporary view provides local scope within the current Spark session. In this example we create a temporary view using the `createTempView()` function\n"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"sdf.createTempView(\"cars\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Exercise 3 - Running SQL queries and aggregating data\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Once we have a table view, we can run queries similar to querying a SQL table. We perform similar operations to the ones in the DataFrames notebook. Note the difference here however is that we use the SQL queries directly.\n"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
"| name| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb|\n",
"+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
"| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4|\n",
"| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4|\n",
"| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1|\n",
"| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1|\n",
"| Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2|\n",
"| Valiant|18.1| 6|225.0|105|2.76| 3.46|20.22| 1| 0| 3| 1|\n",
"| Duster 360|14.3| 8|360.0|245|3.21| 3.57|15.84| 0| 0| 3| 4|\n",
"| Merc 240D|24.4| 4|146.7| 62|3.69| 3.19| 20.0| 1| 0| 4| 2|\n",
"| Merc 230|22.8| 4|140.8| 95|3.92| 3.15| 22.9| 1| 0| 4| 2|\n",
"| Merc 280|19.2| 6|167.6|123|3.92| 3.44| 18.3| 1| 0| 4| 4|\n",
"| Merc 280C|17.8| 6|167.6|123|3.92| 3.44| 18.9| 1| 0| 4| 4|\n",
"| Merc 450SE|16.4| 8|275.8|180|3.07| 4.07| 17.4| 0| 0| 3| 3|\n",
"| Merc 450SL|17.3| 8|275.8|180|3.07| 3.73| 17.6| 0| 0| 3| 3|\n",
"| Merc 450SLC|15.2| 8|275.8|180|3.07| 3.78| 18.0| 0| 0| 3| 3|\n",
"| Cadillac Fleetwood|10.4| 8|472.0|205|2.93| 5.25|17.98| 0| 0| 3| 4|\n",
"|Lincoln Continental|10.4| 8|460.0|215| 3.0|5.424|17.82| 0| 0| 3| 4|\n",
"| Chrysler Imperial|14.7| 8|440.0|230|3.23|5.345|17.42| 0| 0| 3| 4|\n",
"| Fiat 128|32.4| 4| 78.7| 66|4.08| 2.2|19.47| 1| 1| 4| 1|\n",
"| Honda Civic|30.4| 4| 75.7| 52|4.93|1.615|18.52| 1| 1| 4| 2|\n",
"| Toyota Corolla|33.9| 4| 71.1| 65|4.22|1.835| 19.9| 1| 1| 4| 1|\n",
"+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"# Showing the whole table\n",
"spark.sql(\"SELECT * FROM cars\").show()"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+\n",
"| mpg|\n",
"+----+\n",
"|21.0|\n",
"|21.0|\n",
"|22.8|\n",
"|21.4|\n",
"|18.7|\n",
"+----+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"# Showing a specific column\n",
"spark.sql(\"SELECT mpg FROM cars\").show(5)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
"| name| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb|\n",
"+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
"| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1|\n",
"| Merc 240D|24.4| 4|146.7| 62|3.69| 3.19| 20.0| 1| 0| 4| 2|\n",
"| Merc 230|22.8| 4|140.8| 95|3.92| 3.15| 22.9| 1| 0| 4| 2|\n",
"| Fiat 128|32.4| 4| 78.7| 66|4.08| 2.2|19.47| 1| 1| 4| 1|\n",
"|Honda Civic|30.4| 4| 75.7| 52|4.93|1.615|18.52| 1| 1| 4| 2|\n",
"+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"# Basic filtering query to determine cars that have a high mileage and low cylinder count\n",
"spark.sql(\"SELECT * FROM cars where mpg>20 AND cyl < 6\").show(5)"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 8:==========> (3 + 13) / 16]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+---+\n",
"|count(1)|cyl|\n",
"+--------+---+\n",
"| 7| 6|\n",
"| 11| 4|\n",
"| 14| 8|\n",
"+--------+---+\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"# Aggregating data and grouping by cylinders\n",
"spark.sql(\"SELECT count(*), cyl from cars GROUP BY cyl\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Exercise 4 - Create a Pandas UDF to apply a columnar operation\n",
"\n",
"Apache Spark has become the de-facto standard in processing big data. To enable data scientists to leverage the value of big data, Spark added a Python API in version 0.7, with support for user-defined functions (UDF). These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala and then invoke them from Python.\n",
"\n",
"Pandas UDFs built on top of Apache Arrow bring you the *best of both worlds*—the ability to define low-overhead, high-performance UDFs entirely in Python. In this simple example, we will build a Scalar Pandas UDF to convert the wT column from imperial units (1000-lbs) to metric units (metric tons).\n",
"\n",
"In addition, UDFs can be registered and invoked in SQL out of the box by registering a regular python function using the `@pandas_udf()` decorator. We can then apply this UDF to our `wt` column.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Task 1: Importing libraries and registering a UDF\n"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"# import the Pandas UDF function \n",
"from pyspark.sql.functions import pandas_udf, PandasUDFType"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<function __main__.convert_wt(s: pandas.core.series.Series) -> pandas.core.series.Series>"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"@pandas_udf(\"float\")\n",
"def convert_wt(s: pd.Series) -> pd.Series:\n",
" # The formula for converting from imperial to metric tons\n",
" return s * 0.45\n",
"\n",
"spark.udf.register(\"convert_weight\", convert_wt)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Task 2: Applying the UDF to the tableview\n",
"\n",
"We can now apply the `convert_weight` user-defined-function to our `wt` column from the `cars` table view. This is done very simply using the SQL query shown below. In this example below we show both the original weight (in ton-lbs) and converted weight (in metric tons).\n"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 13:======================================> (6 + 3) / 9]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+\n",
"| name| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb|weight_imperial|weight_metric|\n",
"+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+\n",
"| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4| 2.62| 1.179|\n",
"| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4| 2.875| 1.29375|\n",
"| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1| 2.32| 1.044|\n",
"| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1| 3.215| 1.44675|\n",
"| Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2| 3.44| 1.548|\n",
"| Valiant|18.1| 6|225.0|105|2.76| 3.46|20.22| 1| 0| 3| 1| 3.46| 1.557|\n",
"| Duster 360|14.3| 8|360.0|245|3.21| 3.57|15.84| 0| 0| 3| 4| 3.57| 1.6065|\n",
"| Merc 240D|24.4| 4|146.7| 62|3.69| 3.19| 20.0| 1| 0| 4| 2| 3.19| 1.4355|\n",
"| Merc 230|22.8| 4|140.8| 95|3.92| 3.15| 22.9| 1| 0| 4| 2| 3.15| 1.4175|\n",
"| Merc 280|19.2| 6|167.6|123|3.92| 3.44| 18.3| 1| 0| 4| 4| 3.44| 1.548|\n",
"| Merc 280C|17.8| 6|167.6|123|3.92| 3.44| 18.9| 1| 0| 4| 4| 3.44| 1.548|\n",
"| Merc 450SE|16.4| 8|275.8|180|3.07| 4.07| 17.4| 0| 0| 3| 3| 4.07| 1.8315|\n",
"| Merc 450SL|17.3| 8|275.8|180|3.07| 3.73| 17.6| 0| 0| 3| 3| 3.73| 1.6785|\n",
"| Merc 450SLC|15.2| 8|275.8|180|3.07| 3.78| 18.0| 0| 0| 3| 3| 3.78| 1.701|\n",
"| Cadillac Fleetwood|10.4| 8|472.0|205|2.93| 5.25|17.98| 0| 0| 3| 4| 5.25| 2.3625|\n",
"|Lincoln Continental|10.4| 8|460.0|215| 3.0|5.424|17.82| 0| 0| 3| 4| 5.424| 2.4408|\n",
"| Chrysler Imperial|14.7| 8|440.0|230|3.23|5.345|17.42| 0| 0| 3| 4| 5.345| 2.40525|\n",
"| Fiat 128|32.4| 4| 78.7| 66|4.08| 2.2|19.47| 1| 1| 4| 1| 2.2| 0.99|\n",
"| Honda Civic|30.4| 4| 75.7| 52|4.93|1.615|18.52| 1| 1| 4| 2| 1.615| 0.72675|\n",
"| Toyota Corolla|33.9| 4| 71.1| 65|4.22|1.835| 19.9| 1| 1| 4| 1| 1.835| 0.82575|\n",
"+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+\n",
"only showing top 20 rows\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"spark.sql(\"SELECT *, wt AS weight_imperial, convert_weight(wt) as weight_metric FROM cars\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Practice Questions\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Question 1 - Basic SQL operations\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Display all Mercedez car rows from the `cars` table view we created earlier. The Mercedez cars have the prefix \"Merc\" in the car name column.\n"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+----+---+-----+---+----+----+----+---+---+----+----+\n",
"| name| mpg|cyl| disp| hp|drat| wt|qsec| vs| am|gear|carb|\n",
"+-----------+----+---+-----+---+----+----+----+---+---+----+----+\n",
"| Merc 240D|24.4| 4|146.7| 62|3.69|3.19|20.0| 1| 0| 4| 2|\n",
"| Merc 230|22.8| 4|140.8| 95|3.92|3.15|22.9| 1| 0| 4| 2|\n",
"| Merc 280|19.2| 6|167.6|123|3.92|3.44|18.3| 1| 0| 4| 4|\n",
"| Merc 280C|17.8| 6|167.6|123|3.92|3.44|18.9| 1| 0| 4| 4|\n",
"| Merc 450SE|16.4| 8|275.8|180|3.07|4.07|17.4| 0| 0| 3| 3|\n",
"| Merc 450SL|17.3| 8|275.8|180|3.07|3.73|17.6| 0| 0| 3| 3|\n",
"|Merc 450SLC|15.2| 8|275.8|180|3.07|3.78|18.0| 0| 0| 3| 3|\n",
"+-----------+----+---+-----+---+----+----+----+---+---+----+----+\n",
"\n"
]
}
],
"source": [
"# Code block for learners to answer\n",
"spark.sql(\"SELECT * FROM cars WHERE name like '%Merc%'\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Double-click **here** for a hint.\n",
"\n",
"<!-- The hint is below:\n",
"\n",
"The SQL query word `like` is used to identify patterns. \n",
"\n",
"-->\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Double-click **here** for the solution.\n",
"\n",
"<!-- The answer is below:\n",
"\n",
"spark.sql(\"SELECT * FROM cars where name like 'Merc%'\").show()\n",
"\n",
"-->\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Question 2 - User Defined Functions\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this notebook, we created a UDF to convert weight from imperial to metric units. Now for this exercise, please create a pandas UDF to convert the `mpg` column to `kmpl` (kilometers per liter). You can use the conversion factor of 0.425.\n"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"22/01/25 01:04:31 WARN analysis.SimpleFunctionRegistry: The function convert_mpg_kmpl replaced a previously registered function.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+\n",
"| name| mpg|cyl| disp| hp|drat| wt| qsec| vs| am|gear|carb| mpg| kmpl|\n",
"+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+\n",
"| Mazda RX4|21.0| 6|160.0|110| 3.9| 2.62|16.46| 0| 1| 4| 4|21.0| 8.925|\n",
"| Mazda RX4 Wag|21.0| 6|160.0|110| 3.9|2.875|17.02| 0| 1| 4| 4|21.0| 8.925|\n",
"| Datsun 710|22.8| 4|108.0| 93|3.85| 2.32|18.61| 1| 1| 4| 1|22.8| 9.69|\n",
"| Hornet 4 Drive|21.4| 6|258.0|110|3.08|3.215|19.44| 1| 0| 3| 1|21.4| 9.095|\n",
"| Hornet Sportabout|18.7| 8|360.0|175|3.15| 3.44|17.02| 0| 0| 3| 2|18.7| 7.9475|\n",
"| Valiant|18.1| 6|225.0|105|2.76| 3.46|20.22| 1| 0| 3| 1|18.1| 7.6925|\n",
"| Duster 360|14.3| 8|360.0|245|3.21| 3.57|15.84| 0| 0| 3| 4|14.3| 6.0775|\n",
"| Merc 240D|24.4| 4|146.7| 62|3.69| 3.19| 20.0| 1| 0| 4| 2|24.4| 10.37|\n",
"| Merc 230|22.8| 4|140.8| 95|3.92| 3.15| 22.9| 1| 0| 4| 2|22.8| 9.69|\n",
"| Merc 280|19.2| 6|167.6|123|3.92| 3.44| 18.3| 1| 0| 4| 4|19.2| 8.16|\n",
"| Merc 280C|17.8| 6|167.6|123|3.92| 3.44| 18.9| 1| 0| 4| 4|17.8| 7.565|\n",
"| Merc 450SE|16.4| 8|275.8|180|3.07| 4.07| 17.4| 0| 0| 3| 3|16.4| 6.97|\n",
"| Merc 450SL|17.3| 8|275.8|180|3.07| 3.73| 17.6| 0| 0| 3| 3|17.3| 7.3525|\n",
"| Merc 450SLC|15.2| 8|275.8|180|3.07| 3.78| 18.0| 0| 0| 3| 3|15.2| 6.46|\n",
"| Cadillac Fleetwood|10.4| 8|472.0|205|2.93| 5.25|17.98| 0| 0| 3| 4|10.4| 4.42|\n",
"|Lincoln Continental|10.4| 8|460.0|215| 3.0|5.424|17.82| 0| 0| 3| 4|10.4| 4.42|\n",
"| Chrysler Imperial|14.7| 8|440.0|230|3.23|5.345|17.42| 0| 0| 3| 4|14.7| 6.2475|\n",
"| Fiat 128|32.4| 4| 78.7| 66|4.08| 2.2|19.47| 1| 1| 4| 1|32.4| 13.77|\n",
"| Honda Civic|30.4| 4| 75.7| 52|4.93|1.615|18.52| 1| 1| 4| 2|30.4| 12.92|\n",
"| Toyota Corolla|33.9| 4| 71.1| 65|4.22|1.835| 19.9| 1| 1| 4| 1|33.9|14.4075|\n",
"+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+\n",
"only showing top 20 rows\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"# Code block for learners to answer\n",
"@pandas_udf(\"float\")\n",
"def convert_mpg_kmpl(s: pd.Series) -> pd.Series:\n",
" return s * 0.425\n",
"\n",
"spark.udf.register(\"convert_mpg_kmpl\", convert_mpg_kmpl)\n",
"\n",
"spark.sql(\"SELECT *, mpg AS mpg, convert_mpg_kmpl(mpg) as kmpl FROM cars\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Double-click **here** for the solution.\n",
"\n",
"<!-- The answer is below:\n",
"\n",
"@pandas_udf(\"float\")\n",
"def convert_mileage(s: pd.Series) -> pd.Series:\n",
" # The formula for converting from imperial to metric tons\n",
" return s * 0.425\n",
"\n",
"spark.udf.register(\"convert_mileage\", convert_mileage)\n",
"\n",
"spark.sql(\"SELECT *, mpg AS mpg, convert_weight(mpg) as kmpl FROM cars\").show()\n",
"-->\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Authors\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"[Karthik Muthuraman](https://www.linkedin.com/in/karthik-muthuraman/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0225ENSkillsNetwork25716109-2021-01-01)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Other Contributors\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"[Jerome Nilmeier](https://github.com/nilmeier)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Change Log\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"| Date (YYYY-MM-DD) | Version | Changed By | Change Description |\n",
"| ----------------- | ------- | ---------- | ------------------ |\n",
"| 2021-07-02 | 0.2 | Karthik | Beta launch |\n",
"| 2021-06-30 | 0.1 | Karthik | First Draft |\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright © 2021 IBM Corporation. All rights reserved.\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python",
"language": "python",
"name": "conda-env-python-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment