Skip to content

Instantly share code, notes, and snippets.

@Abuton
Created January 5, 2021 11:04
Show Gist options
  • Save Abuton/8edb7bacfc810ae6dd3bec30738d70f5 to your computer and use it in GitHub Desktop.
Save Abuton/8edb7bacfc810ae6dd3bec30738d70f5 to your computer and use it in GitHub Desktop.
Final Assignment on Scalable ML using pyspark
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": "Welcome to the final project of \u201cApache Spark for Scalable Machine Learning on BigData\u201d. In this assignment you\u2019ll analyze a real-world dataset and apply machine learning on it using Apache Spark. \n\nIn order to pass, you need to implement some code (as described in the instruction section on Coursera) and finally answer a quiz on the Coursera platform.\n"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "This notebook is designed to run in a IBM Watson Studio default runtime (NOT the Watson Studio Apache Spark Runtime as the default runtime with 1 vCPU is free of charge). Therefore, we install Apache Spark in local mode for test purposes only. Please don't use it in production.\n\nIn case you are facing issues, please read the following two documents first:\n\nhttps://github.com/IBM/skillsnetwork/wiki/Environment-Setup\n\nhttps://github.com/IBM/skillsnetwork/wiki/FAQ\n\nThen, please feel free to ask:\n\nhttps://coursera.org/learn/machine-learning-big-data-apache-spark/discussions/all\n\nPlease make sure to follow the guidelines before asking a question:\n\nhttps://github.com/IBM/skillsnetwork/wiki/FAQ#im-feeling-lost-and-confused-please-help-me\n\n\nIf running outside Watson Studio, this should work as well. In case you are running in an Apache Spark context outside Watson Studio, please remove the Apache Spark setup in the first notebook cells."
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": "from IPython.display import Markdown, display\ndef printmd(string):\n display(Markdown('# <span style=\"color:red\">'+string+'</span>'))\n\n\nif ('sc' in locals() or 'sc' in globals()):\n printmd('<<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>>')\n"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": "!pip install pyspark==2.4.5"
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": "try:\n from pyspark import SparkContext, SparkConf\n from pyspark.sql import SparkSession\nexcept ImportError as e:\n printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')"
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": "sc = SparkContext.getOrCreate(SparkConf().setMaster(\"local[*]\"))\n\nspark = SparkSession \\\n .builder \\\n .getOrCreate()"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "Let\u2019s start by downloading the dataset and creating a dataframe. This dataset can be found on DAX, the IBM Data Asset Exchange and can be downloaded for free.\n\nhttps://developer.ibm.com/exchanges/data/all/jfk-weather-data/"
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "--2021-01-05 10:02:25-- https://dax-cdn.cdn.appdomain.cloud/dax-noaa-weather-data-jfk-airport/1.1.4/noaa-weather-data-jfk-airport.tar.gz\nResolving dax-cdn.cdn.appdomain.cloud (dax-cdn.cdn.appdomain.cloud)... 95.101.143.203, 88.221.134.250, 88.221.135.8, ...\nConnecting to dax-cdn.cdn.appdomain.cloud (dax-cdn.cdn.appdomain.cloud)|95.101.143.203|:443... connected.\nHTTP request sent, awaiting response... 200 OK\nLength: 3509993 (3.3M) [application/x-tar]\nSaving to: \u2018noaa-weather-data-jfk-airport.tar.gz\u2019\n\nnoaa-weather-data-j 100%[===================>] 3.35M --.-KB/s in 0.08s \n\n2021-01-05 10:02:25 (39.6 MB/s) - \u2018noaa-weather-data-jfk-airport.tar.gz\u2019 saved [3509993/3509993]\n\nnoaa-weather-data-jfk-airport/jfk_weather.csv\n"
}
],
"source": "# delete files from previous runs\n!rm -rf noaa-weather-data-jfk-airport*\n\n# download the file containing the data in CSV format\n!wget https://dax-cdn.cdn.appdomain.cloud/dax-noaa-weather-data-jfk-airport/1.1.4/noaa-weather-data-jfk-airport.tar.gz\n\n# extract the data\n!tar xvfz noaa-weather-data-jfk-airport.tar.gz noaa-weather-data-jfk-airport/jfk_weather.csv\n \n# create a dataframe out of it by using the first row as field names and trying to infer a schema based on contents\ndf = spark.read.option(\"header\", \"true\").option(\"inferSchema\",\"true\").csv('noaa-weather-data-jfk-airport/jfk_weather.csv')\n\n# register a corresponding query table\ndf.createOrReplaceTempView('df')"
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"textn| STATION| STATION_NAME|ELEVATION|LATITUDE|LONGITUDE| DATE|REPORTTPYE| HOURLYSKYCONDITIONS|HOURLYVISIBILITY|HOURLYPRSENTWEATHERTYPE|HOURLYDRYBULBTEMPF|HOURLYDRYBULBTEMPC|HOURLYWETBULBTEMPF|HOURLYWETBULBTEMPC|HOURLYDewPointTempF|HOURLYDewPointTempC|HOURLYRelativeHumidity|HOURLYWindSpeed|HOURLYWindDirection|HOURLYWindGustSpeed|HOURLYStationPressure|HOURLYPressureTendency|HOURLYPressureChange|HOURLYSeaLevelPressure|HOURLYPrecip|HOURLYAltimeterSetting|DAILYMaximumDryBulbTemp|DAILYMinimumDryBulbTemp|DAILYAverageDryBulbTemp|DAILYDeptFromNormalAverageTemp|DAILYAverageRelativeHumidity|DAILYAverageDewPointTemp|DAILYAverageWetBulbTemp|DAILYHeatingDegreeDays|DAILYCoolingDegreeDays|DAILYSunrise|DAILYSunset|DAILYWeather|DAILYPrecip|DAILYSnowfall|DAILYSnowDepth|DAILYAverageStationPressure|DAILYAverageSeaLevelPressure|DAILYAverageWindSpeed|DAILYPeakWindSpeed|PeakWindDirection|DAILYSustainedWindSpeed|DAILYSustainedWindDirection|MonthlyMaximumTemp|MonthlyMinimumTemp|MonthlyMeanTemp|MonthlyAverageRH|MonthlyDewpointTemp|MonthlyWetBulbTemp|MonthlyAvgHeatingDegreeDays|MonthlyAvgCoolingDegreeDays|MonthlyStationPressure|MonthlySeaLevelPressure|MonthlyAverageWindSpeed|MonthlyTotalSnowfall|MonthlyDeptFromNormalMaximumTemp|MonthlyDeptFromNormalMinimumTemp|MonthlyDeptFromNormalAverageTemp|MonthlyDeptFromNormalPrecip|MonthlyTotalLiquidPrecip|MonthlyGreatestPrecip|MonthlyGreatestPrecipDate|MonthlyGreatestSnowfall|MonthlyGreatestSnowfallDate|MonthlyGreatestSnowDepth|MonthlyGreatestSnowDepthDate|MonthlyDaysWithGT90Temp|MonthlyDaysWithLT32Temp|MonthlyDaysWithGT32Temp|MonthlyDaysWithLT0Temp|MonthlyDaysWithGT001Precip|MonthlyDaysWithGT010Precip|MonthlyDaysWithGT1Snow|MonthlyMaxSeaLevelPressureValue|MonthlyMaxSeaLevelPressureDate|MonthlyMaxSeaLevelPressureTime|MonthlyMinSeaLevelPressureValue|MonthlyMinSeaLevelPressureDate|MonthlyMinSeaLevelPressureTime|MonthlyTotalHeatingDegreeDays|MonthlyTotalCoolingDegreeDays|MonthlyDeptFromNormalHeatingDD|MonthlyDeptFromNormalCoolingDD|MonthlyTotalSeasonToDateHeatingDD|MonthlyTotalSeasonToDateCoolingDD|\nn|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 00:51| FM-15|FEW:02 7 SCT:04 1...| 6.00| -RA:02 PL:06 BR:1...| 33| 0.6| 32| 0.1| 31| -0.6| 92| 0| 000| null| 29.97| 8| null| 29.99| 0.01| 29.99| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 01:00| FM-12| null| 5.59| ||PL:79| 33| 0.6| 32| 0.1| 31| -0.6| 92| 0| 000| null| 29.96| 8| +0.05| 29.99| null| null| 34| 30| 32| -1.7| null| null| null| 33| 0| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 01:51| FM-15|FEW:02 6 SCT:04 1...| 6.00| -RA:02 PL:06 BR:1...| 33| 0.6| 33| 0.3| 32| 0.0| 96| 0| 000| null| 29.97| null| null| 29.99| 0.02| 29.99| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 02:03| FM-16|FEW:02 6 SCT:04 1...| 6.00| -RA:02 BR:1 |RA:6...| 34| 1.0| 33| 0.7| 32| 0.0| 93| 0| 000| null| 29.97| null| null| null| T| 29.99| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 02:28| FM-16|BKN:07 7 BKN:07 1...| 5.00| -RA:02 BR:1 |RA:6...| 34| 1.0| 33| 0.7| 32| 0.0| 93| 0| 000| null| 29.97| null| null| null| T| 29.99| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 02:51| FM-15| BKN:07 7 OVC:08 15| 5.00| -RA:02 BR:1 |RA:6...| 33| 0.6| 33| 0.3| 32| 0.0| 96| 0| 000| null| 29.97| null| null| 29.99| T| 29.99| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 03:51| FM-15| FEW:02 5 OVC:08 12| 5.00| -RA:02 BR:1 |RA:6...| 33| 0.6| 33| 0.3| 32| 0.0| 96| 0| 000| null| 29.95| 8| null| 29.97| T| 29.97| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 04:00| FM-12| null| 4.97| ||RA:61| 33| 0.6| 33| 0.3| 32| 0.0| 96| 0| 000| null| 29.94| 8| +0.03| 29.97| null| null| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 04:51| FM-15| BKN:07 11 OVC:08 35| 5.00| -RA:02 BR:1 |RA:6...| 33| 0.6| 32| 0.1| 31| -0.6| 92| 0| 000| null| 29.93| null| null| 29.96| T| 29.95| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 05:49| FM-16| BKN:07 9| 5.00| BR:1 ||| 34| 1.0| 32| 0.3| 30| -1.0| 87| 0| 000| null| 29.95| null| null| null| null| 29.97| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\nnonly showing top 10 rows\n\n"
}
],
"source": "df.show(10)"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "The dataset contains some null values, therefore schema inference didn\u2019t work properly for all columns, in addition, a column contained trailing characters, so we need to clean up the data set first. This is a normal task in any data science project since your data is never clean, don\u2019t worry if you don\u2019t understand all code, you won\u2019t be asked about it. "
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": "import random\nrandom.seed(42)\n\nfrom pyspark.sql.functions import translate, col\n\n# the code creates a new column by replacing the older column with a well formatted datatype of the old column\n # translate is more like str.replace\ndf_cleaned = df \\\n .withColumn(\"HOURLYWindSpeed\", df.HOURLYWindSpeed.cast('double')) \\\n .withColumn(\"HOURLYWindDirection\", df.HOURLYWindDirection.cast('double')) \\\n .withColumn(\"HOURLYStationPressure\", translate(col(\"HOURLYStationPressure\"), \"s,\", \"\")) \\\n .withColumn(\"HOURLYPrecip\", translate(col(\"HOURLYPrecip\"), \"s,\", \"\")) \\\n .withColumn(\"HOURLYRelativeHumidity\", translate(col(\"HOURLYRelativeHumidity\"), \"*\", \"\")) \\\n .withColumn(\"HOURLYDRYBULBTEMPC\", translate(col(\"HOURLYDRYBULBTEMPC\"), \"*\", \"\")) \\\n\ndf_cleaned = df_cleaned \\\n .withColumn(\"HOURLYStationPressure\", df_cleaned.HOURLYStationPressure.cast('double')) \\\n .withColumn(\"HOURLYPrecip\", df_cleaned.HOURLYPrecip.cast('double')) \\\n .withColumn(\"HOURLYRelativeHumidity\", df_cleaned.HOURLYRelativeHumidity.cast('double')) \\\n .withColumn(\"HOURLYDRYBULBTEMPC\", df_cleaned.HOURLYDRYBULBTEMPC.cast('double')) \\\n\n# an sql statement to filter out unwanted data points in the data (regular in all data cleaning tasks :))\n# essentially the code is selecting all entry that is not null SQL is English IMO\ndf_filtered = df_cleaned.filter(\"\"\"\n HOURLYWindSpeed <> 0\n and HOURLYWindSpeed IS NOT NULL\n and HOURLYWindDirection IS NOT NULL\n and HOURLYStationPressure IS NOT NULL\n and HOURLYPressureTendency IS NOT NULL\n and HOURLYPrecip IS NOT NULL\n and HOURLYRelativeHumidity IS NOT NULL\n and HOURLYDRYBULBTEMPC IS NOT NULL\n\"\"\")"
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "+----------+--------------------+---------+--------+---------+----------------+----------+--------------------+----------------+-----------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+----------------------+---------------+-------------------+-------------------+---------------------+----------------------+--------------------+----------------------+------------+----------------------+-----------------------+-----------------------+-----------------------+------------------------------+----------------------------+------------------------+-----------------------+----------------------+----------------------+------------+-----------+------------+-----------+-------------+--------------+---------------------------+----------------------------+---------------------+------------------+-----------------+-----------------------+---------------------------+------------------+------------------+---------------+----------------+-------------------+------------------+---------------------------+---------------------------+----------------------+-----------------------+-----------------------+--------------------+--------------------------------+--------------------------------+--------------------------------+---------------------------+------------------------+---------------------+-------------------------+-----------------------+---------------------------+------------------------+----------------------------+-----------------------+-----------------------+-----------------------+----------------------+--------------------------+--------------------------+----------------------+-------------------------------+------------------------------+------------------------------+-------------------------------+------------------------------+------------------------------+-----------------------------+-----------------------------+------------------------------+------------------------------+---------------------------------+---------------------------------+\n| STATION| STATION_NAME|ELEVATION|LATITUDE|LONGITUDE| DATE|REPORTTPYE| HOURLYSKYCONDITIONS|HOURLYVISIBILITY|HOURLYPRSENTWEATHERTYPE|HOURLYDRYBULBTEMPF|HOURLYDRYBULBTEMPC|HOURLYWETBULBTEMPF|HOURLYWETBULBTEMPC|HOURLYDewPointTempF|HOURLYDewPointTempC|HOURLYRelativeHumidity|HOURLYWindSpeed|HOURLYWindDirection|HOURLYWindGustSpeed|HOURLYStationPressure|HOURLYPressureTendency|HOURLYPressureChange|HOURLYSeaLevelPressure|HOURLYPrecip|HOURLYAltimeterSetting|DAILYMaximumDryBulbTemp|DAILYMinimumDryBulbTemp|DAILYAverageDryBulbTemp|DAILYDeptFromNormalAverageTemp|DAILYAverageRelativeHumidity|DAILYAverageDewPointTemp|DAILYAverageWetBulbTemp|DAILYHeatingDegreeDays|DAILYCoolingDegreeDays|DAILYSunrise|DAILYSunset|DAILYWeather|DAILYPrecip|DAILYSnowfall|DAILYSnowDepth|DAILYAverageStationPressure|DAILYAverageSeaLevelPressure|DAILYAverageWindSpeed|DAILYPeakWindSpeed|PeakWindDirection|DAILYSustainedWindSpeed|DAILYSustainedWindDirection|MonthlyMaximumTemp|MonthlyMinimumTemp|MonthlyMeanTemp|MonthlyAverageRH|MonthlyDewpointTemp|MonthlyWetBulbTemp|MonthlyAvgHeatingDegreeDays|MonthlyAvgCoolingDegreeDays|MonthlyStationPressure|MonthlySeaLevelPressure|MonthlyAverageWindSpeed|MonthlyTotalSnowfall|MonthlyDeptFromNormalMaximumTemp|MonthlyDeptFromNormalMinimumTemp|MonthlyDeptFromNormalAverageTemp|MonthlyDeptFromNormalPrecip|MonthlyTotalLiquidPrecip|MonthlyGreatestPrecip|MonthlyGreatestPrecipDate|MonthlyGreatestSnowfall|MonthlyGreatestSnowfallDate|MonthlyGreatestSnowDepth|MonthlyGreatestSnowDepthDate|MonthlyDaysWithGT90Temp|MonthlyDaysWithLT32Temp|MonthlyDaysWithGT32Temp|MonthlyDaysWithLT0Temp|MonthlyDaysWithGT001Precip|MonthlyDaysWithGT010Precip|MonthlyDaysWithGT1Snow|MonthlyMaxSeaLevelPressureValue|MonthlyMaxSeaLevelPressureDate|MonthlyMaxSeaLevelPressureTime|MonthlyMinSeaLevelPressureValue|MonthlyMinSeaLevelPressureDate|MonthlyMinSeaLevelPressureTime|MonthlyTotalHeatingDegreeDays|MonthlyTotalCoolingDegreeDays|MonthlyDeptFromNormalHeatingDD|MonthlyDeptFromNormalCoolingDD|MonthlyTotalSeasonToDateHeatingDD|MonthlyTotalSeasonToDateCoolingDD|\nn|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 00:51| FM-15|FEW:02 7 SCT:04 1...| 6.00| -RA:02 PL:06 BR:1...| 33| 0.6| 32| 0.1| 31| -0.6| 92.0| 0.0| 0.0| null| 29.97| 8| null| 29.99| 0.01| 29.99| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 01:00| FM-12| null| 5.59| ||PL:79| 33| 0.6| 32| 0.1| 31| -0.6| 92.0| 0.0| 0.0| null| 29.96| 8| +0.05| 29.99| null| null| 34| 30| 32| -1.7| null| null| null| 33| 0| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 01:51| FM-15|FEW:02 6 SCT:04 1...| 6.00| -RA:02 PL:06 BR:1...| 33| 0.6| 33| 0.3| 32| 0.0| 96.0| 0.0| 0.0| null| 29.97| null| null| 29.99| 0.02| 29.99| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 02:03| FM-16|FEW:02 6 SCT:04 1...| 6.00| -RA:02 BR:1 |RA:6...| 34| 1.0| 33| 0.7| 32| 0.0| 93.0| 0.0| 0.0| null| 29.97| null| null| null| null| 29.99| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 02:28| FM-16|BKN:07 7 BKN:07 1...| 5.00| -RA:02 BR:1 |RA:6...| 34| 1.0| 33| 0.7| 32| 0.0| 93.0| 0.0| 0.0| null| 29.97| null| null| null| null| 29.99| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 02:51| FM-15| BKN:07 7 OVC:08 15| 5.00| -RA:02 BR:1 |RA:6...| 33| 0.6| 33| 0.3| 32| 0.0| 96.0| 0.0| 0.0| null| 29.97| null| null| 29.99| null| 29.99| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 03:51| FM-15| FEW:02 5 OVC:08 12| 5.00| -RA:02 BR:1 |RA:6...| 33| 0.6| 33| 0.3| 32| 0.0| 96.0| 0.0| 0.0| null| 29.95| 8| null| 29.97| null| 29.97| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 04:00| FM-12| null| 4.97| ||RA:61| 33| 0.6| 33| 0.3| 32| 0.0| 96.0| 0.0| 0.0| null| 29.94| 8| +0.03| 29.97| null| null| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 04:51| FM-15| BKN:07 11 OVC:08 35| 5.00| -RA:02 BR:1 |RA:6...| 33| 0.6| 32| 0.1| 31| -0.6| 92.0| 0.0| 0.0| null| 29.93| null| null| 29.96| null| 29.95| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\n|WBAN:94789|JFK INTERNATIONAL...| 3.4| 40.6386| -73.7622|2010-01-01 05:49| FM-16| BKN:07 9| 5.00| BR:1 ||| 34| 1.0| 32| 0.3| 30| -1.0| 87.0| 0.0| 0.0| null| 29.95| null| null| null| null| 29.97| null| null| null| null| null| null| null| null| null| 719| 1639| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| -9999| -9999| null| -9999| -9999| null| null| null| null| null| null|\nnonly showing top 10 rows\n\n"
}
],
"source": "df_cleaned.show(10)"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "We want to predict the value of one column based of some others. It is sometimes helpful to print a correlation matrix. "
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": "array([[ 1. , 0.25478863, -0.26171147],\n [ 0.25478863, 1. , -0.13377466],\n [-0.26171147, -0.13377466, 1. ]])"
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": "from pyspark.ml.feature import VectorAssembler\nvectorAssembler = VectorAssembler(inputCols=[\"HOURLYWindSpeed\",\"HOURLYWindDirection\",\"HOURLYStationPressure\"],\n outputCol=\"features\")\ndf_pipeline = vectorAssembler.transform(df_filtered)\nfrom pyspark.ml.stat import Correlation\nCorrelation.corr(df_pipeline,\"features\").head()[0].toArray()"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "As we can see, HOURLYWindSpeed and HOURLYWindDirection correlate with 0.25478863 whereas HOURLYWindSpeed and HOURLYStationPressure correlate with -0.26171147, this is a good sign if we want to predict HOURLYWindSpeed from HOURLYWindDirection and HOURLYStationPressure. Note that the numbers can change over time as we are always working with the latest data.\nSince this is supervised learning, let\u2019s split our data into train (80%) and test (20%) set."
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": "# # what is the correlation between HOURLYWindSpeed and HOURLYPressureTendency\n# vectAss = VectorAssembler(inputCols=['HOURLYWindSpeed', 'HOURLYPressureTendency'], outputCol='windspeedpressure')\n# df_pipe = vectAss.transform(df_filtered)\n# Correlation.corr(df_pipe, 'windspeedpressure').head()[0].toArray()"
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": "splits = df_filtered.randomSplit([0.8, 0.2])\ndf_train = splits[0]\ndf_test = splits[1]"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "Again, we can re-use our feature engineering pipeline"
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": "from pyspark.ml.feature import StringIndexer, OneHotEncoder\n# from pyspark.ml.linalg import Vectors\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml.feature import Normalizer\nfrom pyspark.ml import Pipeline\n\nvectorAssembler = VectorAssembler(inputCols=[\n \"HOURLYWindDirection\",\n \"ELEVATION\",\n \"HOURLYStationPressure\"],\n outputCol=\"features\")\n\nnormalizer = Normalizer(inputCol=\"features\", outputCol=\"features_norm\", p=1.0)"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "Now we define a function for evaluating our regression prediction performance. We\u2019re using RMSE (Root Mean Squared Error) here , the smaller the better\u2026\n\n"
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": "def regression_metrics(prediction):\n from pyspark.ml.evaluation import RegressionEvaluator\n evaluator = RegressionEvaluator(\n labelCol=\"HOURLYWindSpeed\", predictionCol=\"prediction\", metricName=\"rmse\")\n rmse = evaluator.evaluate(prediction)\n print(\"RMSE on test data = %g\" % rmse)"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "Let\u2019s run a linear regression model first for building a baseline.\n\n"
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "RMSE on test data = 5.30775\n"
}
],
"source": "#LR1\n\nfrom pyspark.ml.regression import LinearRegression\n\n\nlr = LinearRegression(labelCol=\"HOURLYWindSpeed\", featuresCol='features', maxIter=100, regParam=0.0, elasticNetParam=0.0)\npipeline = Pipeline(stages=[vectorAssembler, normalizer,lr])\nmodel = pipeline.fit(df_train)\nprediction = model.transform(df_test)\nregression_metrics(prediction)"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "implement another linear regression to use normalized features instead of the original values of features and re evaluate i.e compute rmse again\n"
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "RMSE on test data = 5.53835\n"
}
],
"source": "lr = LinearRegression(labelCol=\"HOURLYWindSpeed\", featuresCol='features_norm', maxIter=100, regParam=0.0, elasticNetParam=0.0)\npipeline = Pipeline(stages=[vectorAssembler, normalizer,lr])\nmodel = pipeline.fit(df_train)\nprediction = model.transform(df_test)\nregression_metrics(prediction)"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "Looks like the normalized values are not reducing the RMSE as much onfact it has a higher rmse than the un normalized values"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "Now we\u2019ll try a Gradient Boosted Tree Regressor"
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "RMSE on test data = 5.11646\n"
}
],
"source": "#GBT1\n\nfrom pyspark.ml.regression import GBTRegressor\ngbt = GBTRegressor(labelCol=\"HOURLYWindSpeed\", maxIter=100)\npipeline = Pipeline(stages=[vectorAssembler, normalizer,gbt])\nmodel = pipeline.fit(df_train)\nprediction = model.transform(df_test)\nregression_metrics(prediction)"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "Better MOdel??"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "Now let\u2019s switch gears. Previously, we tried to predict HOURLYWindSpeed, but now we predict HOURLYWindDirection. In order to turn this into a classification problem we discretize the value using the Bucketizer. The new feature is called HOURLYWindDirectionBucketized.\n\nBy Discretizing, we are binning the floating values in the HOURLYWindDirection into categories"
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": "from pyspark.ml.feature import Bucketizer, OneHotEncoder\nbucketizer = Bucketizer(splits=[ 0, 180, float('Inf') ],inputCol=\"HOURLYWindDirection\", outputCol=\"HOURLYWindDirectionBucketized\")\nencoder = OneHotEncoder(inputCol=\"HOURLYWindDirectionBucketized\", outputCol=\"HOURLYWindDirectionOHE\")\n"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "Again, we define a function in order to assess how we perform. Here we just use the accuracy measure which gives us the fraction of correctly classified examples. Again, 0 is bad, 1 is good."
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [],
"source": "def classification_metrics(prediction):\n from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n mcEval = MulticlassClassificationEvaluator().setMetricName(\"accuracy\") .setPredictionCol(\"prediction\").setLabelCol(\"HOURLYWindDirectionBucketized\")\n accuracy = mcEval.evaluate(prediction)\n print(\"Accuracy on test data = %g\" % accuracy)"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "Again, for baselining we use LogisticRegression."
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "Accuracy on test data = 0.692922\n"
}
],
"source": "#LGReg1\n\nfrom pyspark.ml.classification import LogisticRegression\nlr = LogisticRegression(labelCol=\"HOURLYWindDirectionBucketized\", maxIter=10)\n#,\"ELEVATION\",\"HOURLYStationPressure\",\"HOURLYPressureTendency\",\"HOURLYPrecip\"\n\nvectorAssembler = VectorAssembler(inputCols=[\"HOURLYWindSpeed\",\"HOURLYDRYBULBTEMPC\"],\n outputCol=\"features\")\n\npipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,lr])\nmodel = pipeline.fit(df_train)\nprediction = model.transform(df_test)\nclassification_metrics(prediction)"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "Let\u2019s try some other Algorithms and see if model performance increases. It\u2019s also important to tweak other parameters like parameters of individual algorithms (e.g. number of trees for RandomForest) or parameters in the feature engineering pipeline, e.g. train/test split ratio, normalization, bucketing, \u2026"
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "Accuracy on test data = 0.722146\n"
}
],
"source": "#RF1\n\nfrom pyspark.ml.classification import RandomForestClassifier\nrf = RandomForestClassifier(labelCol=\"HOURLYWindDirectionBucketized\", numTrees=10)\n\nvectorAssembler = VectorAssembler(inputCols=[\"HOURLYWindSpeed\",\"HOURLYDRYBULBTEMPC\",\"ELEVATION\",\"HOURLYStationPressure\",\"HOURLYPressureTendency\",\"HOURLYPrecip\"],\n outputCol=\"features\")\n\npipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,rf])\nmodel = pipeline.fit(df_train)\nprediction = model.transform(df_test)\nclassification_metrics(prediction)"
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "Accuracy on test data = 0.730137\n"
}
],
"source": "#GBT2\n\nfrom pyspark.ml.classification import GBTClassifier\ngbt = GBTClassifier(labelCol=\"HOURLYWindDirectionBucketized\", maxIter=100)\n\nvectorAssembler = VectorAssembler(inputCols=[\"HOURLYWindSpeed\",\"HOURLYDRYBULBTEMPC\",\"ELEVATION\",\"HOURLYStationPressure\",\"HOURLYPressureTendency\",\"HOURLYPrecip\"],\n outputCol=\"features\")\n\npipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,gbt])\nmodel = pipeline.fit(df_train)\nprediction = model.transform(df_test)\nclassification_metrics(prediction)"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": ""
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.7",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment