Skip to content

Instantly share code, notes, and snippets.

@Orbifold
Created August 13, 2019 05:09
Show Gist options
  • Save Orbifold/7e053af172ecffc94b25615514377da9 to your computer and use it in GitHub Desktop.
Save Orbifold/7e053af172ecffc94b25615514377da9 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"source": [
"# Spark Walmart Exercise \n",
"\n",
"Based on the [Walmart stock data 2012-2016](https://raw.githubusercontent.com/clumdee/Python-and-Spark-for-Big-Data-master/master/Spark_DataFrame_Project_Exercise/walmart_stock.csv)."
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"import findspark, os\n",
"from pyspark.sql import SparkSession\n",
"from pyspark import SparkContext\n",
"\n",
"os.environ[\"JAVA_HOME\"]=\"/Library/Java/JavaVirtualMachines/jdk1.8.0_202.jdk/Contents/Home\"\n",
"\n",
"print(findspark.find())\n",
"findspark.init()\n",
"\n",
"sc = SparkContext.getOrCreate()\n",
"spark = SparkSession.Builder().appName('Exercise').getOrCreate()"
],
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"/usr/local/Cellar/apache-spark/2.4.3/libexec/\n"
]
}
],
"execution_count": 1,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"#### Load the Walmart Stock CSV File, have Spark infer the data types."
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"df = spark.read.csv('/Users/swa/Desktop/LargeFiles/Walmart/walmart_stock.csv', inferSchema=True, header=True)"
],
"outputs": [],
"execution_count": 15,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"The `show` and `head` methods do not display a pretty table, the following ways is somewhat more pleasing:"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"df.limit(5).toPandas()"
],
"outputs": [
{
"output_type": "execute_result",
"execution_count": 22,
"data": {
"text/plain": [
" Date Open High Low Close Volume Adj Close\n",
"0 2012-01-03 59.970001 61.060001 59.869999 60.330002 12668800 52.619235\n",
"1 2012-01-04 60.209999 60.349998 59.470001 59.709999 9593300 52.078475\n",
"2 2012-01-05 59.349998 59.619999 58.369999 59.419998 12768200 51.825539\n",
"3 2012-01-06 59.419998 59.450001 58.869999 59.000000 8069400 51.459220\n",
"4 2012-01-09 59.029999 59.549999 58.919998 59.180000 6679300 51.616215"
],
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Date</th>\n",
" <th>Open</th>\n",
" <th>High</th>\n",
" <th>Low</th>\n",
" <th>Close</th>\n",
" <th>Volume</th>\n",
" <th>Adj Close</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2012-01-03</td>\n",
" <td>59.970001</td>\n",
" <td>61.060001</td>\n",
" <td>59.869999</td>\n",
" <td>60.330002</td>\n",
" <td>12668800</td>\n",
" <td>52.619235</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2012-01-04</td>\n",
" <td>60.209999</td>\n",
" <td>60.349998</td>\n",
" <td>59.470001</td>\n",
" <td>59.709999</td>\n",
" <td>9593300</td>\n",
" <td>52.078475</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2012-01-05</td>\n",
" <td>59.349998</td>\n",
" <td>59.619999</td>\n",
" <td>58.369999</td>\n",
" <td>59.419998</td>\n",
" <td>12768200</td>\n",
" <td>51.825539</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>2012-01-06</td>\n",
" <td>59.419998</td>\n",
" <td>59.450001</td>\n",
" <td>58.869999</td>\n",
" <td>59.000000</td>\n",
" <td>8069400</td>\n",
" <td>51.459220</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2012-01-09</td>\n",
" <td>59.029999</td>\n",
" <td>59.549999</td>\n",
" <td>58.919998</td>\n",
" <td>59.180000</td>\n",
" <td>6679300</td>\n",
" <td>51.616215</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
]
},
"metadata": {}
}
],
"execution_count": 22,
"metadata": {
"collapsed": false,
"outputHidden": false,
"inputHidden": false
}
},
{
"cell_type": "markdown",
"source": [
"#### What are the column names?"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"str.join(\", \", df.columns)"
],
"outputs": [
{
"output_type": "execute_result",
"execution_count": 24,
"data": {
"text/plain": [
"'Date, Open, High, Low, Close, Volume, Adj Close'"
]
},
"metadata": {}
}
],
"execution_count": 24,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"#### What does the Schema look like?"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"df.printSchema()"
],
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"root\n",
" |-- Date: timestamp (nullable = true)\n",
" |-- Open: double (nullable = true)\n",
" |-- High: double (nullable = true)\n",
" |-- Low: double (nullable = true)\n",
" |-- Close: double (nullable = true)\n",
" |-- Volume: integer (nullable = true)\n",
" |-- Adj Close: double (nullable = true)\n",
"\n"
]
}
],
"execution_count": 4,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"#### The first 5 columns"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"df.select(df.columns[0:5]).show(3)"
],
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+-------------------+------------------+---------+---------+------------------+\n",
"| Date| Open| High| Low| Close|\n",
"+-------------------+------------------+---------+---------+------------------+\n",
"|2012-01-03 00:00:00| 59.970001|61.060001|59.869999| 60.330002|\n",
"|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996|\n",
"|2012-01-05 00:00:00| 59.349998|59.619999|58.369999| 59.419998|\n",
"+-------------------+------------------+---------+---------+------------------+\n",
"only showing top 3 rows\n",
"\n"
]
}
],
"execution_count": 30,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"#### Describing the DataFrame."
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"df.describe().show()"
],
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+\n",
"|summary| Open| High| Low| Close| Volume| Adj Close|\n",
"+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+\n",
"| count| 1258| 1258| 1258| 1258| 1258| 1258|\n",
"| mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|\n",
"| stddev| 6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991| 4519780.8431556|6.722609449996857|\n",
"| min|56.389998999999996| 57.060001| 56.299999| 56.419998| 2094900| 50.363689|\n",
"| max| 90.800003| 90.970001| 89.25| 90.470001| 80898100|84.91421600000001|\n",
"+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+\n",
"\n"
]
}
],
"execution_count": 8,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"Too many digits, so let's format a bit."
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"from pyspark.sql.functions import format_number\n",
"desc = df.describe()\n",
"desc.select(desc['summary'], format_number(desc['Open'].cast('float'), 2).alias('Open'), \n",
" format_number(desc['High'].cast('float'), 2).alias('High'),\n",
" format_number(desc['Low'].cast('float'), 2).alias('Low'), \n",
" format_number(desc['Close'].cast('float'), 2).alias('Close'),\n",
" format_number(desc['Volume'].cast('float'), 2).alias('Adj Close')).show()"
],
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+-------+--------+--------+--------+--------+-------------+\n",
"|summary| Open| High| Low| Close| Adj Close|\n",
"+-------+--------+--------+--------+--------+-------------+\n",
"| count|1,258.00|1,258.00|1,258.00|1,258.00| 1,258.00|\n",
"| mean| 72.36| 72.84| 71.92| 72.39| 8,222,093.50|\n",
"| stddev| 6.77| 6.77| 6.74| 6.76| 4,519,781.00|\n",
"| min| 56.39| 57.06| 56.30| 56.42| 2,094,900.00|\n",
"| max| 90.80| 90.97| 89.25| 90.47|80,898,096.00|\n",
"+-------+--------+--------+--------+--------+-------------+\n",
"\n"
]
}
],
"execution_count": 32,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"#### What day had the Peak High in Price?"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"df.orderBy(df['High'].desc()).take(1)[0][0]"
],
"outputs": [
{
"output_type": "execute_result",
"execution_count": 33,
"data": {
"text/plain": [
"datetime.datetime(2015, 1, 13, 0, 0)"
]
},
"metadata": {}
}
],
"execution_count": 33,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"#### What is the mean of the Close column?"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"from pyspark.sql.functions import avg\n",
"df.select(avg('Close')).show()"
],
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+-----------------+\n",
"| avg(Close)|\n",
"+-----------------+\n",
"|72.38844998012726|\n",
"+-----------------+\n",
"\n"
]
}
],
"execution_count": 34,
"metadata": {
"scrolled": true
}
},
{
"cell_type": "code",
"source": [
"from pyspark.sql.functions import mean\n",
"df.select(mean('Close')).show()"
],
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+-----------------+\n",
"| avg(Close)|\n",
"+-----------------+\n",
"|72.38844998012726|\n",
"+-----------------+\n",
"\n"
]
}
],
"execution_count": 35,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"#### What is the max and min of the Volume column?"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"from pyspark.sql.functions import min, max\n",
"df.select(max(df['Volume']), min(df['Volume'])).show()"
],
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+-----------+-----------+\n",
"|max(Volume)|min(Volume)|\n",
"+-----------+-----------+\n",
"| 80898100| 2094900|\n",
"+-----------+-----------+\n",
"\n"
]
}
],
"execution_count": 36,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"#### How many days was the Close lower than 60 dollars?"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"from pyspark.sql.functions import countDistinct\n",
"df.filter(df['Close'] < 60).select(countDistinct(df['Date'])).collect()[0][0]"
],
"outputs": [
{
"output_type": "execute_result",
"execution_count": 33,
"data": {
"text/plain": [
"81"
]
},
"metadata": {}
}
],
"execution_count": 33,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"The following works as well"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"df.filter(df['Close'] < 60).count()"
],
"outputs": [
{
"output_type": "execute_result",
"execution_count": 68,
"data": {
"text/plain": [
"81"
]
},
"metadata": {}
}
],
"execution_count": 68,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"#### What percentage of the time was the High greater than 80 dollars ?\n",
"#### In other words, (Number of Days High>80)/(Total Days in the dataset)"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"df.filter(df['High'] > 80).select(countDistinct(df['Date'])).collect()[0][0] / df.select(countDistinct(df['Date'])).collect()[0][0] * 100"
],
"outputs": [
{
"output_type": "execute_result",
"execution_count": 37,
"data": {
"text/plain": [
"9.141494435612083"
]
},
"metadata": {}
}
],
"execution_count": 37,
"metadata": {}
},
{
"cell_type": "code",
"source": [
"df.filter(df['High'] > 80).count() / df.count() * 100"
],
"outputs": [
{
"output_type": "execute_result",
"execution_count": 71,
"data": {
"text/plain": [
"9.141494435612083"
]
},
"metadata": {}
}
],
"execution_count": 71,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"#### What is the Pearson correlation between High and Volume?\n"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"df.corr('High', 'Volume')"
],
"outputs": [
{
"output_type": "execute_result",
"execution_count": 37,
"data": {
"text/plain": [
"-0.3384326061737161"
]
},
"metadata": {}
}
],
"execution_count": 37,
"metadata": {}
},
{
"cell_type": "code",
"source": [
"from pyspark.sql.functions import corr\n",
"df.select(corr('High', 'Volume')).show()"
],
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+-------------------+\n",
"| corr(High, Volume)|\n",
"+-------------------+\n",
"|-0.3384326061737161|\n",
"+-------------------+\n",
"\n"
]
}
],
"execution_count": 38,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"#### What is the max High per year?"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"from pyspark.sql.functions import year\n",
"\n",
"year_df = df.withColumn('Year', year(df['Date']))\n",
"year_df.show(3)"
],
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+-------------------+------------------+---------+---------+------------------+--------+------------------+----+\n",
"| Date| Open| High| Low| Close| Volume| Adj Close|Year|\n",
"+-------------------+------------------+---------+---------+------------------+--------+------------------+----+\n",
"|2012-01-03 00:00:00| 59.970001|61.060001|59.869999| 60.330002|12668800|52.619234999999996|2012|\n",
"|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300| 52.078475|2012|\n",
"|2012-01-05 00:00:00| 59.349998|59.619999|58.369999| 59.419998|12768200| 51.825539|2012|\n",
"+-------------------+------------------+---------+---------+------------------+--------+------------------+----+\n",
"only showing top 3 rows\n",
"\n"
]
}
],
"execution_count": 39,
"metadata": {}
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"name": "python3",
"language": "python",
"display_name": "Python 3"
},
"language_info": {
"name": "python",
"version": "3.7.2",
"mimetype": "text/x-python",
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"pygments_lexer": "ipython3",
"nbconvert_exporter": "python",
"file_extension": ".py"
},
"kernel_info": {
"name": "python3"
},
"gist_id": "4a35e75f2bb41563d3d8304cff0413c0",
"nteract": {
"version": "0.14.4"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment