Skip to content

Instantly share code, notes, and snippets.

@Mageswaran1989
Created January 30, 2020 03:24
Show Gist options
  • Save Mageswaran1989/847c3193c9dd5910cb6d350a5d6cdebc to your computer and use it in GitHub Desktop.
Save Mageswaran1989/847c3193c9dd5910cb6d350a5d6cdebc to your computer and use it in GitHub Desktop.
A gentele introduction to Spark Datasets
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"authors = [\"mageswaran\"] \n",
"categories = [\"Apache Spark\", \"Distributed Computing\"] \n",
"date = \"2019-04-07T14:30:00+05:30\" \n",
"description = \"Apache Spark Dataset API Usage\" \n",
"tags = [\"SQL\", \"Dataset\", \"DataFrame\", \"Spark\"] \n",
"title = \"Apache Spark Dataset API Usage\" "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# [Apache Spark Dataset](https://spark.apache.org/docs/latest/sql-programming-guide.html)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"- Java @ https://linuxize.com/post/install-java-on-ubuntu-18-04/\n",
"- Toree \n",
" - This notebook uses Toree Spark kernel to run. Please find the installation guide @ https://toree.apache.org/docs/current/user/installation/\n",
"\n",
" ```\n",
" #Use the Java 8 version of OpenJDK\n",
" sudo update-alternatives --config java\n",
" sudo update-alternatives --config javac\n",
"\n",
" export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/\n",
" pip install toree\n",
" jupyter toree install --interpreters=Scala,PySpark,SQL --spark_home=/opt/data/spark-2.4.0-bin-hadoop2.7/ --user\n",
" ```\n",
" \n",
"This notebook is about exploring the Dataset APIs."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Some Caveats\n",
"- The types of the columns disappear when running untyped transformations\n",
"- The names of the columns sometimes disappear partially or completely when typed transformation is used\n",
"- Missing value handling\n",
"\n",
"\n",
"DataFrame + Data Types (`case class`) = Dataset\n",
"\n",
"## Untyped transformations \n",
"- Example adding a new column to your Dataset, the result will be a DataFrame, even if you define the type of the new column. \n",
"\n",
"If you wanted to work within a Dataset environment, then the steps are the followings for untyped transformations:\n",
"\n",
"* Have a Dataset\n",
"* Apply the function to it\n",
"* The result is a DataFrame\n",
"* Convert the result to Dataset by defining the types of the columns in a case class\n",
"\n",
"## Typed transformations\n",
"When we use typed transformation, the output is a Dataset with proper types. But if there is a change in the columns (fewer or more columns, or new columns created) then the name of the columns we see at the display will be valid only in DataFrame sense (“columnName”). The Dataset reference _.columnName won’t work in these cases, but you can refer to the columns with ._1 or ._2 etc. If you would like to have proper column names, use a case class again.\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"sqlContext = org.apache.spark.sql.SQLContext@e139cd1\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"org.apache.spark.sql.SQLContext@e139cd1"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import org.apache.spark.sql.{Dataset, DataFrame, SparkSession}\n",
"import org.apache.spark.sql.functions._\n",
"import org.apache.spark.sql._\n",
"\n",
"val sqlContext = spark.sqlContext"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 1. CREATING DATASET\n",
"## MANUALLY\n",
"- Define the data as a sequence.\n",
"- Convert the sequence to DataFrame and define the column names\n",
"- Define the type of the columns by a case class (using proper column names is a must)\n",
"- Convert to Dataset"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"df = [name: string, friends: string]\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[name: string, friends: string]"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"//In this example we create a small Dataset with two columns: the first column contains the name of Star Wars \n",
"//Characters and the second one lists the name of their friends.\n",
"val df = Seq((\"Yoda\", \"Obi-Wan Kenobi\"),\n",
" (\"Anakin Skywalker\", \"Sheev Palpatine\"),\n",
" (\"Luke Skywalker\", \"Han Solo, Leia Skywalker\"),\n",
" (\"Leia Skywalker\", \"Obi-Wan Kenobi\"),\n",
" (\"Sheev Palpatine\", \"Anakin Skywalker\"),\n",
" (\"Han Solo\", \"Leia Skywalker, Luke Skywalker, Obi-Wan Kenobi, Chewbacca\"),\n",
" (\"Obi-Wan Kenobi\", \"Yoda, Qui-Gon Jinn\"),\n",
" (\"R2-D2\", \"C-3PO\"),\n",
" (\"C-3PO\", \"R2-D2\"),\n",
" (\"Darth Maul\", \"Sheev Palpatine\"),\n",
" (\"Chewbacca\", \"Han Solo\"),\n",
" (\"Lando Calrissian\", \"Han Solo\"),\n",
" (\"Jabba\", \"Boba Fett\")\n",
" ).toDF(\"name\", \"friends\")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+--------------------+\n",
"| name| friends|\n",
"+----------------+--------------------+\n",
"| Yoda| Obi-Wan Kenobi|\n",
"|Anakin Skywalker| Sheev Palpatine|\n",
"| Luke Skywalker|Han Solo, Leia Sk...|\n",
"| Leia Skywalker| Obi-Wan Kenobi|\n",
"| Sheev Palpatine| Anakin Skywalker|\n",
"| Han Solo|Leia Skywalker, L...|\n",
"| Obi-Wan Kenobi| Yoda, Qui-Gon Jinn|\n",
"| R2-D2| C-3PO|\n",
"| C-3PO| R2-D2|\n",
"| Darth Maul| Sheev Palpatine|\n",
"| Chewbacca| Han Solo|\n",
"|Lando Calrissian| Han Solo|\n",
"| Jabba| Boba Fett|\n",
"+----------------+--------------------+\n",
"\n"
]
}
],
"source": [
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Array((name,StringType), (friends,StringType))"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.dtypes"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"defined class Friends\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"case class Friends(name: String, friends: String)\n",
"org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) //Ensure case class is avaiable in other cells"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"friends_ds = [name: string, friends: string]\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[name: string, friends: string]"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val friends_ds = df.as[Friends]"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+--------------------+\n",
"| name| friends|\n",
"+----------------+--------------------+\n",
"| Yoda| Obi-Wan Kenobi|\n",
"|Anakin Skywalker| Sheev Palpatine|\n",
"| Luke Skywalker|Han Solo, Leia Sk...|\n",
"| Leia Skywalker| Obi-Wan Kenobi|\n",
"| Sheev Palpatine| Anakin Skywalker|\n",
"| Han Solo|Leia Skywalker, L...|\n",
"| Obi-Wan Kenobi| Yoda, Qui-Gon Jinn|\n",
"| R2-D2| C-3PO|\n",
"| C-3PO| R2-D2|\n",
"| Darth Maul| Sheev Palpatine|\n",
"| Chewbacca| Han Solo|\n",
"|Lando Calrissian| Han Solo|\n",
"| Jabba| Boba Fett|\n",
"+----------------+--------------------+\n",
"\n"
]
}
],
"source": [
"friends_ds.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## MISSING VALUES"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"defined class Friends_Missing\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"case class Friends_Missing(Who: String, friends: Option[String])\n",
"org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"ds_missing = [Who: string, friends: string]\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[Who: string, friends: string]"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val ds_missing = Seq( \n",
" (\"Yoda\", Some(\"Obi-Wan Kenobi\")),\n",
" (\"Anakin Skywalker\", Some(\"Sheev Palpatine\")),\n",
" (\"Luke Skywalker\", None),\n",
" (\"Leia Skywalker\", Some(\"Obi-Wan Kenobi\")),\n",
" (\"Sheev Palpatine\", Some(\"Anakin Skywalker\")),\n",
" (\"Han Solo\", Some(\"Leia Skywalker, Luke Skywalker, Obi-Wan Kenobi\"))\n",
" ).toDF(\"Who\", \"friends\").as[Friends_Missing]"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+--------------------+\n",
"| Who| friends|\n",
"+----------------+--------------------+\n",
"| Yoda| Obi-Wan Kenobi|\n",
"|Anakin Skywalker| Sheev Palpatine|\n",
"| Luke Skywalker| null|\n",
"| Leia Skywalker| Obi-Wan Kenobi|\n",
"| Sheev Palpatine| Anakin Skywalker|\n",
"| Han Solo|Leia Skywalker, L...|\n",
"+----------------+--------------------+\n",
"\n"
]
}
],
"source": [
"ds_missing.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## READING FROM CSV\n",
"\n",
"The steps of the csv reading:\n",
"\n",
"- Define the names and the types of the columns in a case class. Note that the names of the columns must be identical with the colum names in the header of the file!\n",
"- Read the csv into a DataFrame\n",
"- Convert into Dataset\n",
"\n",
"The result of the read is a DataFrame and as we have seen earlier, the .as[Characters] at the end of the expressions converts it to Dataset."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Name: Error parsing magics!\n",
"Message: Magic pyspark does not exist!\n",
"StackTrace: "
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%pyspark\n",
"import os\n",
"os.system(\"wget https://www.balabit.com/blog/wp-content/uploads/2016/12/StarWars.csv\")"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"defined class Characters\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"case class Characters(name: String, \n",
" height: Integer, \n",
" weight: Option[Integer], \n",
" eyecolor: Option[String], \n",
" haircolor: Option[String], \n",
" jedi: String,\n",
" species: String)\n",
"org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"characters_ds = [name: string, height: int ... 5 more fields]\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[name: string, height: int ... 5 more fields]"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val characters_ds: Dataset[Characters] = sqlContext.\n",
" read.\n",
" option(\"header\", \"true\").\n",
" option(\"delimiter\", \";\").\n",
" option(\"inferSchema\", \"true\").\n",
" csv(\"StarWars.csv\").\n",
" as[Characters]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Some explanation to the read function:\n",
"\n",
"- option(“header”, “true”) – the column names are defined in the first row of the file\n",
"- option(“delimiter”, “;”) – the delimiter is the ;\n",
"- option(“inferSchema”, “true”) – detect the column types automatically. The schema could also be given manually (see in the Subsidiary comment below)."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species|\n",
"+----------------+------+------+--------+---------+-------+-----------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human|\n",
"| Padme Amidala| 165| 45| brown| brown|no_jedi| human|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human|\n",
"| Qui-Gon Jinn| 193| 89| blue| brown| jedi| human|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian|\n",
"| Dooku| 193| 86| brown| brown| jedi| human|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human|\n",
"| Boba Fett| 183| 78| brown| black|no_jedi| human|\n",
"| Jango Fett| 183| 79| brown| black|no_jedi| human|\n",
"+----------------+------+------+--------+---------+-------+-----------+\n",
"\n"
]
}
],
"source": [
"characters_ds.show()"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Array((name,StringType), (height,IntegerType), (weight,IntegerType), (eyecolor,StringType), (haircolor,StringType), (jedi,StringType), (species,StringType))"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"characters_ds.dtypes"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+------+------+--------+---------+----+-------+\n",
"|name|height|weight|eyecolor|haircolor|jedi|species|\n",
"+----+------+------+--------+---------+----+-------+\n",
"+----+------+------+--------+---------+----+-------+\n",
"\n"
]
}
],
"source": [
"characters_ds.filter(x => x.eyecolor == \"brown\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Doesnt work! \n",
"This is because of `Option()` type in `case class` \n",
"Although we don’t see which columns are Option types, we have to keep it in mind when working with them later."
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-------+\n",
"| name|height|weight|eyecolor|haircolor| jedi|species|\n",
"+----------------+------+------+--------+---------+-------+-------+\n",
"| Padme Amidala| 165| 45| brown| brown|no_jedi| human|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda|\n",
"| Dooku| 193| 86| brown| brown| jedi| human|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human|\n",
"| Boba Fett| 183| 78| brown| black|no_jedi| human|\n",
"| Jango Fett| 183| 79| brown| black|no_jedi| human|\n",
"+----------------+------+------+--------+---------+-------+-------+\n",
"\n"
]
}
],
"source": [
"characters_ds.filter(x => x.eyecolor == Some(\"brown\")).show()"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"What if we have not considered null values for non-string type column? Lets see it!"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"defined class Characters_BadType\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"\n",
"case class Characters_BadType(name: String,\n",
" height: Integer, \n",
" weight: Integer, \n",
" eyecolor: String, \n",
" haircolor: String, \n",
" jedi: String,\n",
" species: String)\n",
"org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"characters_BadType_ds = [name: string, height: int ... 5 more fields]\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[name: string, height: int ... 5 more fields]"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val characters_BadType_ds: Dataset[Characters_BadType] = sqlContext.\n",
" read.\n",
" option(\"header\", \"true\").\n",
" option(\"delimiter\", \";\").\n",
" option(\"inferSchema\", \"true\").\n",
" csv(\"StarWars.csv\").\n",
" as[Characters_BadType]"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species|\n",
"+----------------+------+------+--------+---------+-------+-----------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human|\n",
"| Padme Amidala| 165| 45| brown| brown|no_jedi| human|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human|\n",
"| Qui-Gon Jinn| 193| 89| blue| brown| jedi| human|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian|\n",
"| Dooku| 193| 86| brown| brown| jedi| human|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human|\n",
"| Boba Fett| 183| 78| brown| black|no_jedi| human|\n",
"| Jango Fett| 183| 79| brown| black|no_jedi| human|\n",
"+----------------+------+------+--------+---------+-------+-----------+\n",
"\n"
]
}
],
"source": [
"characters_BadType_ds.show()"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"characters_BadType_ds2 = [name: string, height: int ... 5 more fields]\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[name: string, height: int ... 5 more fields]"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val characters_BadType_ds2 = characters_BadType_ds.filter(x=> x.jedi==\"no_jedi\")"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species|\n",
"+----------------+------+------+--------+---------+-------+-----------+\n",
"| Padme Amidala| 165| 45| brown| brown|no_jedi| human|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human|\n",
"| Boba Fett| 183| 78| brown| black|no_jedi| human|\n",
"| Jango Fett| 183| 79| brown| black|no_jedi| human|\n",
"+----------------+------+------+--------+---------+-------+-----------+\n",
"\n"
]
}
],
"source": [
"characters_BadType_ds2.show()"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------+------+------+--------+---------+-------+-------+\n",
"| name|height|weight|eyecolor|haircolor| jedi|species|\n",
"+--------------+------+------+--------+---------+-------+-------+\n",
"| Padme Amidala| 165| 45| brown| brown|no_jedi| human|\n",
"|Leia Skywalker| 150| 49| brown| brown|no_jedi| human|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi|wookiee|\n",
"+--------------+------+------+--------+---------+-------+-------+\n",
"\n"
]
}
],
"source": [
"characters_BadType_ds2.filter(x=> x.haircolor==\"brown\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Error expected**"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Name: org.apache.spark.SparkException\n",
"Message: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10, localhost, executor driver): java.lang.NullPointerException\n",
"\tat scala.Predef$.Integer2int(Predef.scala:362)\n",
"\tat $line55.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:35)\n",
"\tat $line55.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:35)\n",
"\tat org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters$$anonfun$org$apache$spark$sql$catalyst$optimizer$CombineTypedFilters$$combineFilterFunction$4.apply(objects.scala:96)\n",
"\tat org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters$$anonfun$org$apache$spark$sql$catalyst$optimizer$CombineTypedFilters$$combineFilterFunction$4.apply(objects.scala:95)\n",
"\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n",
"\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n",
"\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)\n",
"\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)\n",
"\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)\n",
"\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)\n",
"\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)\n",
"\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n",
"\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)\n",
"\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:288)\n",
"\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n",
"\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)\n",
"\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:288)\n",
"\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n",
"\tat org.apache.spark.scheduler.Task.run(Task.scala:121)\n",
"\tat org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)\n",
"\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n",
"\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)\n",
"\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n",
"\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n",
"\tat java.lang.Thread.run(Thread.java:748)\n",
"\n",
"Driver stacktrace:\n",
"StackTrace: \tat scala.Predef$.Integer2int(Predef.scala:362)\n",
"\tat $anonfun$1.apply(<console>:35)\n",
"\tat $anonfun$1.apply(<console>:35)\n",
"\tat org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters$$anonfun$org$apache$spark$sql$catalyst$optimizer$CombineTypedFilters$$combineFilterFunction$4.apply(objects.scala:96)\n",
"\tat org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters$$anonfun$org$apache$spark$sql$catalyst$optimizer$CombineTypedFilters$$combineFilterFunction$4.apply(objects.scala:95)\n",
"\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n",
"\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n",
"\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)\n",
"\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)\n",
"\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)\n",
"\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)\n",
"\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)\n",
"\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n",
"\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)\n",
"\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:288)\n",
"\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n",
"\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)\n",
"\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:288)\n",
"\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n",
"\tat org.apache.spark.scheduler.Task.run(Task.scala:121)\n",
"\tat org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)\n",
"\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n",
"\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)\n",
"\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n",
"\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n",
"\tat java.lang.Thread.run(Thread.java:748)\n",
"Driver stacktrace:\n",
" at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)\n",
" at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)\n",
" at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)\n",
" at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n",
" at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n",
" at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)\n",
" at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n",
" at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n",
" at scala.Option.foreach(Option.scala:257)\n",
" at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)\n",
" at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)\n",
" at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)\n",
" at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)\n",
" at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)\n",
" at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)\n",
" at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)\n",
" at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)\n",
" at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)\n",
" at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)\n",
" at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)\n",
" at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)\n",
" at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)\n",
" at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)\n",
" at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)\n",
" at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)\n",
" at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)\n",
" at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)\n",
" at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)\n",
" at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)\n",
" at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)\n",
" at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)\n",
" at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)\n",
" at org.apache.spark.sql.Dataset.show(Dataset.scala:746)\n",
" at org.apache.spark.sql.Dataset.show(Dataset.scala:705)\n",
" at org.apache.spark.sql.Dataset.show(Dataset.scala:714)\n",
" ... 46 elided\n",
"Caused by: java.lang.NullPointerException\n",
" at scala.Predef$.Integer2int(Predef.scala:362)\n",
" at $anonfun$1.apply(<console>:35)\n",
" at $anonfun$1.apply(<console>:35)\n",
" at org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters$$anonfun$org$apache$spark$sql$catalyst$optimizer$CombineTypedFilters$$combineFilterFunction$4.apply(objects.scala:96)\n",
" at org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters$$anonfun$org$apache$spark$sql$catalyst$optimizer$CombineTypedFilters$$combineFilterFunction$4.apply(objects.scala:95)\n",
" at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n",
" at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n",
" at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)\n",
" at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)\n",
" at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)\n",
" at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)\n",
" at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)\n",
" at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n",
" at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)\n",
" at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)\n",
" at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n",
" at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)\n",
" at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)\n",
" at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n",
" at org.apache.spark.scheduler.Task.run(Task.scala:121)\n",
" at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)\n",
" at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n",
" at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)"
]
},
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"characters_BadType_ds2.filter(x=> x.weight>79).show()\n"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------+------+------+--------+---------+-------+-----------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species|\n",
"+----------+------+------+--------+---------+-------+-----------+\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|\n",
"|Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee|\n",
"+----------+------+------+--------+---------+-------+-----------+\n",
"\n"
]
},
{
"data": {
"text/plain": [
"lastException: Throwable = null\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"characters_BadType_ds2.filter(x=> x.weight!=null && x.weight>79).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The conslusion here is that if you can not trust a column has all the values defined then it is safer to use Option in the case class to handle missing values. Use types without Option[] only for columns where it is 100% sure that no missing values can appear (applies for numeric, string or all other types as well)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### DEFINING THE SCHEMA OF THE DATAFRAME MANUALLY\n",
"This part is related to DataFrames rather than Datasets.\n",
"When reading the csv file into a DataFrame, we can define the schema manually. We can get the idea that we could control (or detect) missing values during the reading process if we use nullable=false in the schema. Let’s try this. The first step is to create the schema manually by defining the column names, types and whether nullable is true or false. Before creating the schema import some types."
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [],
"source": [
"import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"DF_schema = StructType(StructField(name,StringType,false), StructField(height,IntegerType,false), StructField(weight,IntegerType,false), StructField(eyecolor,StringType,false), StructField(haircolor,StringType,false), StructField(jedi,StringType,false), StructField(species,StringType,false))\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"StructType(StructField(name,StringType,false), StructField(height,IntegerType,false), StructField(weight,IntegerType,false), StructField(eyecolor,StringType,false), StructField(haircolor,StringType,false), StructField(jedi,StringType,false), StructField(species,StringType,false))"
]
},
"execution_count": 27,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val DF_schema = StructType(Array(\n",
" StructField(\"name\", StringType, false),\n",
" StructField(\"height\", IntegerType, false),\n",
" StructField(\"weight\", IntegerType, false),\n",
" StructField(\"eyecolor\", StringType, false),\n",
" StructField(\"haircolor\", StringType, false),\n",
" StructField(\"jedi\", StringType, false),\n",
" StructField(\"species\", StringType, false)))"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- name: string (nullable = false)\n",
" |-- height: integer (nullable = false)\n",
" |-- weight: integer (nullable = false)\n",
" |-- eyecolor: string (nullable = false)\n",
" |-- haircolor: string (nullable = false)\n",
" |-- jedi: string (nullable = false)\n",
" |-- species: string (nullable = false)\n",
"\n"
]
}
],
"source": [
"DF_schema.printTreeString"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"characters1_df = [name: string, height: int ... 5 more fields]\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[name: string, height: int ... 5 more fields]"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val characters1_df = sqlContext.\n",
" read.\n",
" format(\"com.databricks.spark.csv\").\n",
" option(\"header\", \"true\").\n",
" option(\"delimiter\", \";\").\n",
" schema(DF_schema).\n",
" csv(\"StarWars.csv\")"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species|\n",
"+----------------+------+------+--------+---------+-------+-----------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human|\n",
"| Padme Amidala| 165| 45| brown| brown|no_jedi| human|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human|\n",
"| Qui-Gon Jinn| 193| 89| blue| brown| jedi| human|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian|\n",
"| Dooku| 193| 86| brown| brown| jedi| human|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human|\n",
"| Boba Fett| 183| 78| brown| black|no_jedi| human|\n",
"| Jango Fett| 183| 79| brown| black|no_jedi| human|\n",
"+----------------+------+------+--------+---------+-------+-----------+\n",
"\n"
]
}
],
"source": [
"characters1_df.show()"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- name: string (nullable = true)\n",
" |-- height: integer (nullable = true)\n",
" |-- weight: integer (nullable = true)\n",
" |-- eyecolor: string (nullable = true)\n",
" |-- haircolor: string (nullable = true)\n",
" |-- jedi: string (nullable = true)\n",
" |-- species: string (nullable = true)\n",
"\n"
]
}
],
"source": [
"characters1_df.printSchema"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Did you notice? **nullbale** in DataFrame has become **true** "
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------+------+------+--------+---------+-------+-------+\n",
"| name|height|weight|eyecolor|haircolor| jedi|species|\n",
"+--------------+------+------+--------+---------+-------+-------+\n",
"| Padme Amidala| 165| 45| brown| brown|no_jedi| human|\n",
"|Leia Skywalker| 150| 49| brown| brown|no_jedi| human|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda|\n",
"+--------------+------+------+--------+---------+-------+-------+\n",
"\n"
]
}
],
"source": [
"characters1_df.filter($\"weight\"<75).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 2. JOINING DATASETS"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We work further with the following two Datasets: the first one called friends_ds created manually and a second one called characters_ds which was read in from the csv file. Let’s join them by the name of the characters.\n",
"\n",
"**Inner join** \n",
"If we use inner join, then the result table will contain the keys included in both Datasets.\n",
"Unfortunately the default syntax of join in Spark keeps the key fields from both Datasets. Thus having “name” column in both Datsets results in a DataFrame having two identical columns with identical names, and it is impossible to work with them later on, as we get the following error:\n",
"Reference ‘name’ is ambiguous"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"bad_join_df = [name: string, height: int ... 7 more fields]\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[name: string, height: int ... 7 more fields]"
]
},
"execution_count": 33,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val bad_join_df = characters_ds.join(friends_ds, characters_ds.col(\"name\") === friends_ds.col(\"name\"))"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| name| friends|\n",
"+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human|Anakin Skywalker| Sheev Palpatine|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human| Luke Skywalker|Han Solo, Leia Sk...|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Leia Skywalker| Obi-Wan Kenobi|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Obi-Wan Kenobi| Yoda, Qui-Gon Jinn|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human| Han Solo|Leia Skywalker, L...|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Sheev Palpatine| Anakin Skywalker|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| R2-D2| C-3PO|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| C-3PO| R2-D2|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Yoda| Obi-Wan Kenobi|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Darth Maul| Sheev Palpatine|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Chewbacca| Han Solo|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Jabba| Boba Fett|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human|Lando Calrissian| Han Solo|\n",
"+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+\n",
"\n"
]
}
],
"source": [
"bad_join_df.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**!!! Did you notice?** two `name` column"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Name: org.apache.spark.sql.AnalysisException\n",
"Message: Reference 'name' is ambiguous, could be: name, name.;\n",
"StackTrace: at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:259)\n",
" at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:101)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$39.apply(Analyzer.scala:889)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$39.apply(Analyzer.scala:891)\n",
" at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:888)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve$2.apply(Analyzer.scala:897)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve$2.apply(Analyzer.scala:897)\n",
" at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:326)\n",
" at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)\n",
" at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:897)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$35.apply(Analyzer.scala:957)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$35.apply(Analyzer.scala:957)\n",
" at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)\n",
" at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)\n",
" at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)\n",
" at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:104)\n",
" at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116)\n",
" at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$2.apply(QueryPlan.scala:121)\n",
" at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)\n",
" at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)\n",
" at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n",
" at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n",
" at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)\n",
" at scala.collection.AbstractTraversable.map(Traversable.scala:104)\n",
" at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:121)\n",
" at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:126)\n",
" at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)\n",
" at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:126)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:957)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:900)\n",
" at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1$$anonfun$apply$1.apply(AnalysisHelper.scala:90)\n",
" at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1$$anonfun$apply$1.apply(AnalysisHelper.scala:90)\n",
" at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)\n",
" at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1.apply(AnalysisHelper.scala:89)\n",
" at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1.apply(AnalysisHelper.scala:86)\n",
" at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)\n",
" at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.resolveOperatorsUp(AnalysisHelper.scala:86)\n",
" at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:29)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:900)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:758)\n",
" at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)\n",
" at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)\n",
" at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)\n",
" at scala.collection.immutable.List.foldLeft(List.scala:84)\n",
" at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)\n",
" at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)\n",
" at scala.collection.immutable.List.foreach(List.scala:392)\n",
" at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:127)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:121)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:106)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)\n",
" at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)\n",
" at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)\n",
" at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)\n",
" at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)\n",
" at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)\n",
" at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)\n",
" at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3407)\n",
" at org.apache.spark.sql.Dataset.select(Dataset.scala:1335)"
]
},
"execution_count": 35,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"bad_join_df.select($\"name\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The solution to the problem above is to use Seq(“name”) in case the keys have the same appelation in your Datasets."
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"sw_df = [name: string, height: int ... 6 more fields]\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"lastException: Throwable = null\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[name: string, height: int ... 6 more fields]"
]
},
"execution_count": 36,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val sw_df = characters_ds.join(friends_ds, Seq(\"name\"))"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_df.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Although we created an inner join of two Datasets, thus the column types were all defined, the result of the join is a DataFrame.\n",
"In order to get a Dataset again, create a case class for the names and the types of the joined data and convert the DataFrame to Dataset."
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"defined class SW\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"case class SW(name: String,\n",
" height: Integer,\n",
" weight: Option[Integer],\n",
" eyecolor: Option[String],\n",
" haircolor: Option[String],\n",
" jedi: String,\n",
" species: String,\n",
" friends: String)\n",
"org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"sw_ds = [name: string, height: int ... 6 more fields]\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[name: string, height: int ... 6 more fields]"
]
},
"execution_count": 39,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val sw_ds = sw_df.as[SW]"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Other joins\n",
"If we have to keep all the keys from one of the Datasets we can use “left_outer” or “right_outer” properly."
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| Who| friends|\n",
"+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human|Anakin Skywalker| Sheev Palpatine|\n",
"| Padme Amidala| 165| 45| brown| brown|no_jedi| human| null| null|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human| Luke Skywalker| null|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Leia Skywalker| Obi-Wan Kenobi|\n",
"| Qui-Gon Jinn| 193| 89| blue| brown| jedi| human| null| null|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| null| null|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human| Han Solo|Leia Skywalker, L...|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Sheev Palpatine| Anakin Skywalker|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| null| null|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| null| null|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Yoda| Obi-Wan Kenobi|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| null| null|\n",
"| Dooku| 193| 86| brown| brown| jedi| human| null| null|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| null| null|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| null| null|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| null| null|\n",
"| Boba Fett| 183| 78| brown| black|no_jedi| human| null| null|\n",
"| Jango Fett| 183| 79| brown| black|no_jedi| human| null| null|\n",
"+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+\n",
"\n"
]
}
],
"source": [
"characters_ds.join(ds_missing, characters_ds.col(\"name\") === ds_missing.col(\"Who\"), \"left_outer\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. SELECTING COLUMNS\n",
"The first surprise was how overcomplicated is to select some columns from a Dataset. We already know the name and the type of the columns, but still if we need a part of the Dataset columns then the names or the types should be defined again. Let’s see the possibilities:\n",
"\n",
"- If we use map, then the result is a Dataset so the column types are inherited but the column names are lost.\n",
"- If we use select and the column names, then the result is a DataFrame, so the type of the columns are lost.\n",
"- If we use select and provide the column names AND the column types, then the result is a Dataset with seemingly proper column names and proper types.\n",
"\n",
" - map : Dataset -> Dataset - Column Names\n",
" - select + column names -> DataFrame - Column Types\n",
" - select + column names & types -> Dataset"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[_1: string, _2: int]"
]
},
"execution_count": 42,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.map(x => (x.name, x.weight))"
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+----+\n",
"| _1| _2|\n",
"+----------------+----+\n",
"|Anakin Skywalker| 84|\n",
"| Luke Skywalker| 77|\n",
"| Leia Skywalker| 49|\n",
"| Obi-Wan Kenobi| 77|\n",
"| Han Solo| 80|\n",
"| Sheev Palpatine| 75|\n",
"| R2-D2| 32|\n",
"| C-3PO| 75|\n",
"| Yoda| 17|\n",
"| Darth Maul| 80|\n",
"| Chewbacca| 112|\n",
"| Jabba|null|\n",
"|Lando Calrissian| 79|\n",
"+----------------+----+\n",
"\n"
]
}
],
"source": [
"sw_ds.map(x => (x.name, x.weight)).show()"
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[name: string, weight: int]"
]
},
"execution_count": 44,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.select(\"name\", \"weight\")"
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+\n",
"| name|weight|\n",
"+----------------+------+\n",
"| Yoda| 17|\n",
"|Anakin Skywalker| 84|\n",
"| Luke Skywalker| 77|\n",
"| Leia Skywalker| 49|\n",
"| Sheev Palpatine| 75|\n",
"| Han Solo| 80|\n",
"| Obi-Wan Kenobi| 77|\n",
"| R2-D2| 32|\n",
"| C-3PO| 75|\n",
"| Darth Maul| 80|\n",
"| Chewbacca| 112|\n",
"|Lando Calrissian| 79|\n",
"| Jabba| null|\n",
"+----------------+------+\n",
"\n"
]
}
],
"source": [
"sw_ds.select(\"name\", \"weight\").show()"
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[name: string, weight: int]"
]
},
"execution_count": 46,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.select($\"name\".as[String], $\"weight\".as[Integer])"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+\n",
"| name|weight|\n",
"+----------------+------+\n",
"| Yoda| 17|\n",
"|Anakin Skywalker| 84|\n",
"| Luke Skywalker| 77|\n",
"| Leia Skywalker| 49|\n",
"| Sheev Palpatine| 75|\n",
"| Han Solo| 80|\n",
"| Obi-Wan Kenobi| 77|\n",
"| R2-D2| 32|\n",
"| C-3PO| 75|\n",
"| Darth Maul| 80|\n",
"| Chewbacca| 112|\n",
"|Lando Calrissian| 79|\n",
"| Jabba| null|\n",
"+----------------+------+\n",
"\n"
]
}
],
"source": [
"sw_ds.select($\"name\".as[String], $\"weight\".as[Integer]).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This last solution seems to work well but it has two problems:\n",
"\n",
"- 1.The result is a Dataset[(String, Integer)]. Despite seeing the column names in the display these names are valid only if we use the Dataset as a DataFrame. So we can refer to the columns as “weight” in the untyped expressions (for example .select(“weight”) ), but we can not use the column names in typed expresions where _.weight is needed. For example using gropupByKey(_.weight) or .map(x=> x.weight) after this selection step will result in the following error:\n",
"error: value weight is not a member of (String, Integer)\n",
"Instead of the column names we can refer to the columns in typed operation as ._1 or ._2. So although the names are inherited in a DataFrame sense they were lost in the Dataset sense. (Does it make sense?)\n",
"- 2.When defining “weight”.as[Integer] we can not use “weight”.as[Option[Integer]] and this could lead us to the NullPointerException because there is a missing value in that column for example by using filter(x=> x._2 > 79)\n",
"\n",
"Either way the select is executed you will end up creating a proper case class. We can correct all 3 ways easily by using a new case class:"
]
},
{
"cell_type": "code",
"execution_count": 48,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"defined class NameWeight\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"case class NameWeight(name: String, weight: Option[Integer])\n",
"org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)"
]
},
{
"cell_type": "code",
"execution_count": 49,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[name: string, weight: int]"
]
},
"execution_count": 49,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"\n",
"//1. corrected\n",
"sw_ds.map(x => NameWeight(x.name, x.weight))"
]
},
{
"cell_type": "code",
"execution_count": 50,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+\n",
"| name|weight|\n",
"+----------------+------+\n",
"|Anakin Skywalker| 84|\n",
"| Luke Skywalker| 77|\n",
"| Leia Skywalker| 49|\n",
"| Obi-Wan Kenobi| 77|\n",
"| Han Solo| 80|\n",
"| Sheev Palpatine| 75|\n",
"| R2-D2| 32|\n",
"| C-3PO| 75|\n",
"| Yoda| 17|\n",
"| Darth Maul| 80|\n",
"| Chewbacca| 112|\n",
"| Jabba| null|\n",
"|Lando Calrissian| 79|\n",
"+----------------+------+\n",
"\n"
]
}
],
"source": [
"sw_ds.map(x => NameWeight(x.name, x.weight)).show()"
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[name: string, weight: int]"
]
},
"execution_count": 51,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"//2. corrected\n",
"sw_ds.select(\"name\", \"weight\").as[NameWeight]"
]
},
{
"cell_type": "code",
"execution_count": 52,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+\n",
"| name|weight|\n",
"+----------------+------+\n",
"| Yoda| 17|\n",
"|Anakin Skywalker| 84|\n",
"| Luke Skywalker| 77|\n",
"| Leia Skywalker| 49|\n",
"| Sheev Palpatine| 75|\n",
"| Han Solo| 80|\n",
"| Obi-Wan Kenobi| 77|\n",
"| R2-D2| 32|\n",
"| C-3PO| 75|\n",
"| Darth Maul| 80|\n",
"| Chewbacca| 112|\n",
"|Lando Calrissian| 79|\n",
"| Jabba| null|\n",
"+----------------+------+\n",
"\n"
]
}
],
"source": [
"sw_ds.select(\"name\", \"weight\").as[NameWeight].show()"
]
},
{
"cell_type": "code",
"execution_count": 53,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[name: string, weight: int]"
]
},
"execution_count": 53,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"//3. corrected\n",
"sw_ds.select($\"name\".as[String], $\"weight\".as[Integer]).as[NameWeight]"
]
},
{
"cell_type": "code",
"execution_count": 54,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+\n",
"| name|weight|\n",
"+----------------+------+\n",
"| Yoda| 17|\n",
"|Anakin Skywalker| 84|\n",
"| Luke Skywalker| 77|\n",
"| Leia Skywalker| 49|\n",
"| Sheev Palpatine| 75|\n",
"| Han Solo| 80|\n",
"| Obi-Wan Kenobi| 77|\n",
"| R2-D2| 32|\n",
"| C-3PO| 75|\n",
"| Darth Maul| 80|\n",
"| Chewbacca| 112|\n",
"|Lando Calrissian| 79|\n",
"| Jabba| null|\n",
"+----------------+------+\n",
"\n"
]
}
],
"source": [
"sw_ds.select($\"name\".as[String], $\"weight\".as[Integer]).as[NameWeight].show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. RENAMING COLUMNS\n",
"By renaming some of the columns we get a DataFrame. (At least I could not find a column renamer function producing a Dataset.)\n",
"\n",
"- If we use withColumnRenamed then we can rename the columns one-by-one, the result is a DataFrame.\n",
"- We can convert the Dataset to DataFrame and define all new column names in one step and the result is obviously a DataFrame. \n",
"\n",
"It doesn’t matter which way is used to rename the columns, the result is a DataFrame. Finally we have to create a case class for the new column names and types and convert the DataFrame to Dataset."
]
},
{
"cell_type": "code",
"execution_count": 55,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Name: string, height: int ... 6 more fields]"
]
},
"execution_count": 55,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.withColumnRenamed(\"name\", \"Name\")"
]
},
{
"cell_type": "code",
"execution_count": 56,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"| Name|height|weight|eyecolor|haircolor| jedi| species| friends|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.withColumnRenamed(\"name\", \"Name\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The output shows that the result is a DataFrame.\n",
"We can rename more columns by chianing this function."
]
},
{
"cell_type": "code",
"execution_count": 57,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+--------+-----------+--------------------+\n",
"| Who|height|weight|eyecolor|haircolor|Religion| species| friends|\n",
"+----------------+------+------+--------+---------+--------+-----------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|\n",
"| Leia Skywalker| 150| 49| brown| brown| no_jedi| human| Obi-Wan Kenobi|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|\n",
"| Han Solo| 180| 80| brown| brown| no_jedi| human|Leia Skywalker, L...|\n",
"| Sheev Palpatine| 173| 75| blue| red| no_jedi| human| Anakin Skywalker|\n",
"| R2-D2| 96| 32| null| null| no_jedi| droid| C-3PO|\n",
"| C-3PO| 167| 75| null| null| no_jedi| droid| R2-D2|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|\n",
"| Darth Maul| 175| 80| yellow| none| no_jedi|dathomirian| Sheev Palpatine|\n",
"| Chewbacca| 228| 112| blue| brown| no_jedi| wookiee| Han Solo|\n",
"| Jabba| 390| null| yellow| none| no_jedi| hutt| Boba Fett|\n",
"|Lando Calrissian| 178| 79| brown| blank| no_jedi| human| Han Solo|\n",
"+----------------+------+------+--------+---------+--------+-----------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.withColumnRenamed(\"name\", \"Who\").withColumnRenamed(\"jedi\", \"Religion\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If we would like to rename all the columns, then a shorter way could be to convert the Dataset into a DataFrame by .toDF and then define the new column names.\n",
"\n",
"The column names in the case class are not case sensitive. If you changed only upper case – lower case pairs in the column names, then your original case class should still work. But if the new column names are different in at least one letter, then a new case class definition is needed with proper column names."
]
},
{
"cell_type": "code",
"execution_count": 58,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Name: string, Height: int ... 6 more fields]"
]
},
"execution_count": 58,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.toDF(Seq(\"Name\", \"Height\", \"Weight\", \"Eyecolor\", \"Haircolor\", \"Jedi\", \"Species\", \"Friends\"): _*).as[SW]"
]
},
{
"cell_type": "code",
"execution_count": 59,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"| Name|Height|Weight|Eyecolor|Haircolor| Jedi| Species| Friends|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.toDF(Seq(\"Name\", \"Height\", \"Weight\", \"Eyecolor\", \"Haircolor\", \"Jedi\", \"Species\", \"Friends\"): _*).as[SW].show()"
]
},
{
"cell_type": "code",
"execution_count": 60,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Name: Syntax Error.\n",
"Message: \n",
"StackTrace: "
]
},
"execution_count": 60,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"// sw_ds.toDF(Seq(\"WHO\", \"Height\", \"Weight\", \"Eyecolor\", \"Haircolor\", \"Jedi\", \"Species\", \"Friends\"): _*).as[S"
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"defined class SW2\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"case class SW2(WHO: String,\n",
" height: Integer, \n",
" weight: Option[Integer], \n",
" eyecolor: Option[String], \n",
" haircolor: Option[String], \n",
" jedi: String,\n",
" species: String,\n",
" friends: String)\n",
"org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)"
]
},
{
"cell_type": "code",
"execution_count": 62,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[WHO: string, Height: int ... 6 more fields]"
]
},
"execution_count": 62,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.toDF(Seq(\"WHO\", \"Height\", \"Weight\", \"Eyecolor\", \"Haircolor\", \"Jedi\", \"Species\", \"Friends\"): _*).as[SW2]"
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"| WHO|Height|Weight|Eyecolor|Haircolor| Jedi| Species| Friends|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.toDF(Seq(\"WHO\", \"Height\", \"Weight\", \"Eyecolor\", \"Haircolor\", \"Jedi\", \"Species\", \"Friends\"): _*).as[SW2].show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 5. ADDING NEW COLUMNS\n",
"There are several ways to add new columns to your Dataset based on what kind of column is created. I show the main types with examples. Independently of how you added the new column and whether the type was defined or not, the result will be a DataFrame. So if you need a Dataset output, then define a proper case class and convert the DataFrame into a Dataset.\n",
"\n",
"### CONSTANT COLUMN\n",
"Adding a constant column is easy. Use the withColumn function and provide the name of the new column and the lit() with the value inside the brackets. The result is a DataFrame even if you define the type of the new colmn by sw_ds.withColumn(“count”, lit(1).as[Integer])"
]
},
{
"cell_type": "code",
"execution_count": 64,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[name: string, height: int ... 7 more fields]"
]
},
"execution_count": 64,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.withColumn(\"count\", lit(1))"
]
},
{
"cell_type": "code",
"execution_count": 65,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends|count|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 1|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 1|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 1|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 1|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 1|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 1|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 1|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 1|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 1|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 1|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 1|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| 1|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| 1|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+\n",
"\n"
]
}
],
"source": [
"sw_ds.withColumn(\"count\", lit(1)).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### EXPRESSION1 – TYPE1\n",
"In these expressions the function needs only one string of input, so we can simply use “colum_name”. In the example I calculated the logarithm of the weight of each character.`"
]
},
{
"cell_type": "code",
"execution_count": 66,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[name: string, height: int ... 7 more fields]"
]
},
"execution_count": 66,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.withColumn(\"log_weight\", log(\"weight\"))"
]
},
{
"cell_type": "code",
"execution_count": 67,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends| log_weight|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 4.430816798843313|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 4.343805421853684|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|3.8918202981106265|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 4.343805421853684|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 4.382026634673881|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 4.31748811353631|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|3.4657359027997265|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 4.31748811353631|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 2.833213344056216|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 4.382026634673881|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 4.718498871295094|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| null|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|4.3694478524670215|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.withColumn(\"log_weight\", log(\"weight\")).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### EXPRESSION2 – TYPE2\n",
"In case when the transformation needs more than a string input, we have to use dataset_name(“colum_name”) when referring to a column of the original Dataset.\n",
"\n",
"For example we can calculate the Body mass index of the characters."
]
},
{
"cell_type": "code",
"execution_count": 68,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[name: string, height: int ... 7 more fields]"
]
},
"execution_count": 68,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.withColumn(\"BMI\", sw_ds(\"weight\")/(sw_ds(\"height\")*sw_ds(\"height\")/10000))"
]
},
{
"cell_type": "code",
"execution_count": 69,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends| BMI|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 23.76641014033499|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 26.0275824770146|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 21.77777777777778|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|23.245984784446325|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|24.691358024691358|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 25.05930702662969|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 34.72222222222222|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 26.89232313815483|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 39.02662993572085|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|26.122448979591837|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|21.545090797168356|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| null|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|24.933720489837143|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.withColumn(\"BMI\", sw_ds(\"weight\")/(sw_ds(\"height\")*sw_ds(\"height\")/10000)).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### USER DEFINED FUNCTION\n",
"Finally if using expressions originally not defined for columns but for primitive types like Integer or String, then we have to create user defined functions aka UDFs.\n",
"\n",
"The example I show creates a column containing a tuple made from two columns in the Dataset. We can create tuples from primitive type items, so we define an UDF and then use it on columns."
]
},
{
"cell_type": "code",
"execution_count": 70,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"createTuple2: [Type_x, Type_y](implicit evidence$1: reflect.runtime.universe.TypeTag[Type_x], implicit evidence$2: reflect.runtime.universe.TypeTag[Type_y])org.apache.spark.sql.expressions.UserDefinedFunction\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"import scala.reflect.runtime.universe.TypeTag\n",
"def createTuple2[Type_x: TypeTag, Type_y: TypeTag] = udf[(Type_x, Type_y), Type_x, Type_y]((x: Type_x, y: Type_y) => (x, y))"
]
},
{
"cell_type": "code",
"execution_count": 71,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[name: string, height: int ... 7 more fields]"
]
},
"execution_count": 71,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.withColumn(\"Jedi_Species\", createTuple2[String, String].apply(sw_ds(\"jedi\"), sw_ds(\"species\")))"
]
},
{
"cell_type": "code",
"execution_count": 72,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends| Jedi_Species|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| [jedi, human]|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| [jedi, human]|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| [no_jedi, human]|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| [jedi, human]|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| [no_jedi, human]|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| [no_jedi, human]|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| [no_jedi, droid]|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| [no_jedi, droid]|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| [jedi, yoda]|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|[no_jedi, dathomi...|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| [no_jedi, wookiee]|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| [no_jedi, hutt]|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| [no_jedi, human]|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.withColumn(\"Jedi_Species\", createTuple2[String, String].apply(sw_ds(\"jedi\"), sw_ds(\"species\"))).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can create tuple column from columns with missing values as well."
]
},
{
"cell_type": "code",
"execution_count": 73,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[name: string, height: int ... 7 more fields]"
]
},
"execution_count": 73,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.withColumn(\"Name_Weight\", createTuple2[String, Option[Integer]].apply(sw_ds(\"name\"), sw_ds(\"weight\")))"
]
},
{
"cell_type": "code",
"execution_count": 74,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends| Name_Weight|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|[Anakin Skywalker...|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|[Luke Skywalker, 77]|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|[Leia Skywalker, 49]|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|[Obi-Wan Kenobi, 77]|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| [Han Solo, 80]|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|[Sheev Palpatine,...|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| [R2-D2, 32]|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| [C-3PO, 75]|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| [Yoda, 17]|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| [Darth Maul, 80]|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| [Chewbacca, 112]|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| [Jabba,]|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|[Lando Calrissian...|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.withColumn(\"Name_Weight\", createTuple2[String, Option[Integer]].apply(sw_ds(\"name\"), sw_ds(\"weight\"))).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 6. FILTERING ROWS\n",
"By filtering we can get a part of the rows of the Dataset. The good news is that the names and the types of the columns do not change at all, so the result of a filter is always a Dataset with proper column names. But we have to be very careful when working with columns containing missing values. In the filter function we have to define how to filter the defined values and how to filter the missing values. Let’s see exmaples.\n",
"Filter a string column with no missing values: select the humans in the Dataset."
]
},
{
"cell_type": "code",
"execution_count": 75,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[name: string, height: int ... 6 more fields]"
]
},
"execution_count": 75,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.filter(x => x.species==\"human\")"
]
},
{
"cell_type": "code",
"execution_count": 76,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-------+--------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi|species| friends|\n",
"+----------------+------+------+--------+---------+-------+-------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|\n",
"+----------------+------+------+--------+---------+-------+-------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.filter(x => x.species==\"human\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The same syntax could be used for eyecolor (which contains missing values) without getting error or warning. But the result is empty, although there are characters with brown eyes."
]
},
{
"cell_type": "code",
"execution_count": 77,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+------+------+--------+---------+----+-------+-------+\n",
"|name|height|weight|eyecolor|haircolor|jedi|species|friends|\n",
"+----+------+------+--------+---------+----+-------+-------+\n",
"+----+------+------+--------+---------+----+-------+-------+\n",
"\n"
]
}
],
"source": [
"sw_ds.filter(x => x.eyecolor== \"brown\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The reason is that == operation works on different types, but a string value won’t be equal to any option value as they are represented by Some(value). There are two ways to handle the situation:\n",
"\n",
"- Use Some(value) in the filter\n",
"- use .getOrElse() function and define what should be returned in case of missing values. In the exmple I use .getOrElse(“”) which provides the value if it was defined or an empty string if there was missing value in the record\n",
"\n",
"So let’s see both ways:"
]
},
{
"cell_type": "code",
"execution_count": 78,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-------+--------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi|species| friends|\n",
"+----------------+------+------+--------+---------+-------+-------+--------------------+\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|\n",
"+----------------+------+------+--------+---------+-------+-------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.filter(x => x.eyecolor == Some(\"brown\")).show()"
]
},
{
"cell_type": "code",
"execution_count": 79,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-------+--------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi|species| friends|\n",
"+----------------+------+------+--------+---------+-------+-------+--------------------+\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|\n",
"+----------------+------+------+--------+---------+-------+-------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.filter(x => x.eyecolor.getOrElse(\"\") == \"brown\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Filtering numeric columns without missing values works as expected: filter charactes whose height is less than 100 cm."
]
},
{
"cell_type": "code",
"execution_count": 80,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+------+------+--------+---------+-------+-------+--------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi|species| friends|\n",
"+-----+------+------+--------+---------+-------+-------+--------------+\n",
"|R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda|Obi-Wan Kenobi|\n",
"+-----+------+------+--------+---------+-------+-------+--------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.filter(x => x.height<100).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If there might be missing values in a numeric column (for example the type is Option[Integer]) then the syntax above gives an error. \n",
"sw_ds.filter(x => x.weight >=79) \n",
"would end in \n",
"…error: value >= is not a member of Option[Integer] … \n",
"\n",
"The solution is to use pattern matching and define explicitly the filter for Some() values and for None (missing) values. \n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 81,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.filter(x => x.weight match {case Some(y) => y>=79\n",
" case None => false} ).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 7. GROUPBY AND AGGREGATING\n",
"Calculate a function (mean, min, max etc.) of numeric colums by groups defined in a key column. The syntax is as expected but we have to define the type of the result columns in the aggregation function.\n",
"\n",
"- the key for groupby is given in: groupByKey(_.columnname)\n",
"- the aggregation functions are given in .agg( function_name1(“columnName1”).as[new_type1], function_name2(“columnName2”).as[new_type2] )\n",
"\n",
"We can define several aggregation functions for different columns withinin one aggregation."
]
},
{
"cell_type": "code",
"execution_count": 82,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[value: string, max(height): int ... 3 more fields]"
]
},
"execution_count": 82,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.groupByKey(_.species).agg(max($\"height\").as[Integer], min($\"height\").as[Integer], mean($\"weight\").as[Double], count($\"species\").as[Long] )"
]
},
{
"cell_type": "code",
"execution_count": 83,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+-----------+-----------+-----------------+--------------+\n",
"| value|max(height)|min(height)| avg(weight)|count(species)|\n",
"+-----------+-----------+-----------+-----------------+--------------+\n",
"| hutt| 390| 390| null| 1|\n",
"| human| 188| 150|74.42857142857143| 7|\n",
"|dathomirian| 175| 175| 80.0| 1|\n",
"| yoda| 66| 66| 17.0| 1|\n",
"| wookiee| 228| 228| 112.0| 1|\n",
"| droid| 167| 96| 53.5| 2|\n",
"+-----------+-----------+-----------+-----------------+--------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.groupByKey(_.species).agg(max($\"height\").as[Integer], min($\"height\").as[Integer], mean($\"weight\").as[Double], count($\"species\").as[Long] ).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The same works for columns with missing values. Jabba was not included in the calculation as his weight is not known."
]
},
{
"cell_type": "code",
"execution_count": 84,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[value: string, avg(weight): double]"
]
},
"execution_count": 84,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.groupByKey(_.eyecolor).agg(mean($\"weight\").as[Double])"
]
},
{
"cell_type": "code",
"execution_count": 85,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+-----------+\n",
"| value|avg(weight)|\n",
"+--------+-----------+\n",
"| yellow| 80.0|\n",
"| null| 53.5|\n",
"|bluegray| 77.0|\n",
"| brown| 56.25|\n",
"| blue| 87.0|\n",
"+--------+-----------+\n",
"\n"
]
}
],
"source": [
"sw_ds.groupByKey(_.eyecolor).agg(mean($\"weight\").as[Double]).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The key can contain missing values and the missing values will form a separate group in the groupByKey. The columns in the aggregateion function might also contain missing values and they will be ignored from numerical computations.\n",
"\n",
"Please note that the output is a Dataset with proper column types, but the column names can be used noly as DataFrame columns (“columnName”) and they could be referred by ._1 or ._2 etc as Dataset columns. For example .map(x => x.value) won’t work.\n",
"\n",
"### GROUPBY MULTIPLE KEYS \n",
"For using multiple keys in groupByKey create a tuple from the key columns."
]
},
{
"cell_type": "code",
"execution_count": 86,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[key: struct<_1: string, _2: string ... 1 more field>, avg(weight): double ... 1 more field]"
]
},
"execution_count": 86,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.groupByKey(x=>(x.species, x.jedi, x.haircolor)).agg(mean($\"weight\").as[Double], count($\"species\").as[Long])"
]
},
{
"cell_type": "code",
"execution_count": 87,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+-----------+--------------+\n",
"| key|avg(weight)|count(species)|\n",
"+--------------------+-----------+--------------+\n",
"|[hutt, no_jedi, n...| null| 1|\n",
"|[human, no_jedi, ...| 79.0| 1|\n",
"|[dathomirian, no_...| 80.0| 1|\n",
"|[human, no_jedi, ...| 75.0| 1|\n",
"| [droid, no_jedi,]| 53.5| 2|\n",
"|[wookiee, no_jedi...| 112.0| 1|\n",
"|[human, no_jedi, ...| 64.5| 2|\n",
"| [yoda, jedi, brown]| 17.0| 1|\n",
"|[human, jedi, aub...| 77.0| 1|\n",
"|[human, jedi, blond]| 80.5| 2|\n",
"+--------------------+-----------+--------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.groupByKey(x=>(x.species, x.jedi, x.haircolor)).agg(mean($\"weight\").as[Double], count($\"species\").as[Long] ).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 8. SORTING BY ROWS\n",
"Sort is easy, there is no surprise in the synatx."
]
},
{
"cell_type": "code",
"execution_count": 88,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[name: string, height: int ... 6 more fields]"
]
},
"execution_count": 88,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.orderBy($\"species\".desc, $\"weight\")"
]
},
{
"cell_type": "code",
"execution_count": 89,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.orderBy($\"species\".desc, $\"weight\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 9. APPENDING DATASETS\n",
"Adding two Datasets with the same case class definition is a cake-walk."
]
},
{
"cell_type": "code",
"execution_count": 90,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[name: string, height: int ... 6 more fields]"
]
},
"execution_count": 90,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.union(sw_ds)"
]
},
{
"cell_type": "code",
"execution_count": 91,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"sw_ds.union(sw_ds).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 10. OTHER USEFUL FUNCTIONS\n",
"DESCRIBE THE DATASET\n",
"- get the number of records by using count()\n",
"- get the number of columns by using .columns.size\n",
"- get the schema by using printSchema or by dtypes"
]
},
{
"cell_type": "code",
"execution_count": 92,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"13"
]
},
"execution_count": 92,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.count()"
]
},
{
"cell_type": "code",
"execution_count": 93,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"8"
]
},
"execution_count": 93,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.columns.size"
]
},
{
"cell_type": "code",
"execution_count": 94,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- name: string (nullable = true)\n",
" |-- height: integer (nullable = true)\n",
" |-- weight: integer (nullable = true)\n",
" |-- eyecolor: string (nullable = true)\n",
" |-- haircolor: string (nullable = true)\n",
" |-- jedi: string (nullable = true)\n",
" |-- species: string (nullable = true)\n",
" |-- friends: string (nullable = true)\n",
"\n"
]
}
],
"source": [
"sw_ds.printSchema"
]
},
{
"cell_type": "code",
"execution_count": 95,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Array((name,StringType), (height,IntegerType), (weight,IntegerType), (eyecolor,StringType), (haircolor,StringType), (jedi,StringType), (species,StringType), (friends,StringType))"
]
},
"execution_count": 95,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.dtypes"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### SOME MORE AGGREGATION FUNCTIONS\n",
"Calculate correlation between columns (optionally by groups)"
]
},
{
"cell_type": "code",
"execution_count": 96,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[corr(height, weight): double]"
]
},
"execution_count": 96,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sw_ds.agg(corr($\"height\", $\"weight\").as[Double])"
]
},
{
"cell_type": "code",
"execution_count": 97,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+\n",
"|corr(height, weight)|\n",
"+--------------------+\n",
"| 0.9823964963433258|\n",
"+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.agg(corr($\"height\", $\"weight\").as[Double]).show()"
]
},
{
"cell_type": "code",
"execution_count": 98,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------+--------------------+\n",
"| value|corr(height, weight)|\n",
"+-------+--------------------+\n",
"|no_jedi| 0.9749158985081434|\n",
"| jedi| 0.9973783324232722|\n",
"+-------+--------------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.groupByKey(_.jedi).agg(corr($\"height\", $\"weight\").as[Double]).show()"
]
},
{
"cell_type": "code",
"execution_count": 99,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+------------------+\n",
"| value|first(name, false)|\n",
"+-----------+------------------+\n",
"| hutt| Jabba|\n",
"| human| Anakin Skywalker|\n",
"|dathomirian| Darth Maul|\n",
"| yoda| Yoda|\n",
"| wookiee| Chewbacca|\n",
"| droid| R2-D2|\n",
"+-----------+------------------+\n",
"\n"
]
}
],
"source": [
"// Get the first value by group\n",
"sw_ds.groupByKey(_.species).agg(first($\"name\").as[String]).show()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### OTHER USEFUL FUNCTIONS FOR CREATING NEW COLUMNS\n",
"In the following examples we add new columns to the Dataset thus the result is a DataFrame. In order to get Dataset again create a proper case class and convert the result into Dataset.\n",
"\n",
"The first example is the hash function of a column."
]
},
{
"cell_type": "code",
"execution_count": 100,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends|hashed_hair|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| -519935767|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| -519935767|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 1075090752|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 1156710799|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 1075090752|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 1862204291|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 42|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 42|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 1075090752|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| -209080169|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 1075090752|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| -209080169|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| 539099867|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------+\n",
"\n"
]
}
],
"source": [
"sw_ds.withColumn(\"hashed_hair\", hash(sw_ds(\"haircolor\"))).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The next example is calculating the size of a collection. For example get the number friends listed in the friends column. We need two steps:\n",
"\n",
"- split the friends column at string “, “. This way we get an Array of Strings. Note that map drops the column names, so after this step we have to refer to the splitted column as _2\n",
"- use the size() to get the number of items are in that Array"
]
},
{
"cell_type": "code",
"execution_count": 101,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+--------------------+-----------------+\n",
"| _1| _2|NrOfFriendsListed|\n",
"+----------------+--------------------+-----------------+\n",
"|Anakin Skywalker| [Sheev Palpatine]| 1|\n",
"| Luke Skywalker|[Han Solo, Leia S...| 2|\n",
"| Leia Skywalker| [Obi-Wan Kenobi]| 1|\n",
"| Obi-Wan Kenobi|[Yoda, Qui-Gon Jinn]| 2|\n",
"| Han Solo|[Leia Skywalker, ...| 4|\n",
"| Sheev Palpatine| [Anakin Skywalker]| 1|\n",
"| R2-D2| [C-3PO]| 1|\n",
"| C-3PO| [R2-D2]| 1|\n",
"| Yoda| [Obi-Wan Kenobi]| 1|\n",
"| Darth Maul| [Sheev Palpatine]| 1|\n",
"| Chewbacca| [Han Solo]| 1|\n",
"| Jabba| [Boba Fett]| 1|\n",
"|Lando Calrissian| [Han Solo]| 1|\n",
"+----------------+--------------------+-----------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.\n",
" map(x => (x.name, x.friends.split(\", \")) ).\n",
" withColumn(\"NrOfFriendsListed\", size($\"_2\")).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Add a monotonically increasing id into a new column using the function monotonically_increasing_id."
]
},
{
"cell_type": "code",
"execution_count": 102,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+---+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends| id|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+---+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 0|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 1|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 2|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 3|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 4|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 5|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 6|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 7|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 8|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 9|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 10|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| 11|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| 12|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+---+\n",
"\n"
]
}
],
"source": [
"sw_ds.withColumn(\"id\", monotonically_increasing_id).show()"
]
},
{
"cell_type": "code",
"execution_count": 103,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends| random|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 0.3686924743441671|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 0.44989139464755157|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 0.9446986686535018|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 0.43391947399906095|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 0.4759764393032505|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 0.06933301074534126|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 0.5942209110814239|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 0.6649066835882118|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 0.2905901748469666|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 0.4217074255263884|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 0.4602309435975347|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| 0.1545522130629342|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|0.004784595229000543|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+\n",
"\n"
]
}
],
"source": [
"//Create a column containing random numbers.\n",
"sw_ds.withColumn(\"random\",rand).show()"
]
},
{
"cell_type": "code",
"execution_count": 104,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+----------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends|name_lenth|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+----------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 16|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 14|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 14|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 14|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 8|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 15|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 5|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 5|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 4|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 10|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 9|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| 5|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| 16|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+----------+\n",
"\n"
]
}
],
"source": [
"//Calculate the lenght of strings in a column. For example count the length of the character names in our Dataset.\n",
"sw_ds.withColumn(\"name_lenth\", length(sw_ds(\"name\"))).show()"
]
},
{
"cell_type": "code",
"execution_count": 105,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------------+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends|name_species_diff|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------------+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 15|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 12|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 13|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 12|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 8|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 12|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 5|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 5|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 1|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 9|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 9|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| 5|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| 14|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------------+\n",
"\n"
]
}
],
"source": [
"//We can also get the levenshtein distance between two string columns:\n",
"sw_ds.withColumn(\"name_species_diff\", levenshtein(sw_ds(\"name\"), sw_ds(\"species\"))).show()"
]
},
{
"cell_type": "code",
"execution_count": 106,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+\n",
"| name|height|weight|eyecolor|haircolor| jedi| species| friends|Loc_y|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+\n",
"|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 8|\n",
"| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 6|\n",
"| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 6|\n",
"| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 0|\n",
"| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 5|\n",
"| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 1|\n",
"| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 0|\n",
"| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 0|\n",
"| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 0|\n",
"| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 0|\n",
"| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 0|\n",
"| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| 0|\n",
"|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| 0|\n",
"+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+\n",
"\n"
]
}
],
"source": [
"//Finally we can find the location of a substring within a string by using locate. In the example we look for the first occurrence of letter “S” in the name of the characters.\n",
"sw_ds.withColumn(\"Loc_y\", locate(\"S\", sw_ds(\"name\"))).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 11. FRIENDCOUNT EXAMPLE \n",
"\n",
"THERE IS NO SPARK TUTORIAL WITHOUT THE BELOVED WORDCOUNT EXAMPLE 😉\n",
"\n",
"I prepared a slightly modified version of the wordcount task. Let’s calculate how many times a charater was referred as a friend in the friends column.\n",
"\n",
"I solve this problem in two ways.\n",
"\n",
"First solution\n",
"In the first solution I use only the friends column and do the following steps:\n",
"\n",
"- map – select the column friends\n",
"- flatMap and split – split the strings in the friends column at “, ” – as a result every full name will be in a new row\n",
"- groupByKey – the key is the new (splitted) column\n",
"- count – get the counts\n",
"\n",
"So the result is how many times a character was mentioned as a friend.\n",
"\n",
"If you wanted to run wordcount, then split the text at spaces by using split(” “)"
]
},
{
"cell_type": "code",
"execution_count": 107,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+--------+\n",
"| value|count(1)|\n",
"+----------------+--------+\n",
"| C-3PO| 1|\n",
"| Han Solo| 3|\n",
"| Sheev Palpatine| 2|\n",
"| Leia Skywalker| 2|\n",
"| Boba Fett| 1|\n",
"| Qui-Gon Jinn| 1|\n",
"| Yoda| 1|\n",
"| Luke Skywalker| 1|\n",
"| Obi-Wan Kenobi| 3|\n",
"|Anakin Skywalker| 1|\n",
"| Chewbacca| 1|\n",
"| R2-D2| 1|\n",
"+----------------+--------+\n",
"\n"
]
}
],
"source": [
"sw_ds.\n",
" map(x => x.friends).\n",
" flatMap(_.split(\", \")).\n",
" groupByKey(_.toString).\n",
" count().\n",
" show()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If the friends column has missing values, i.e. the type is Option[String] then we have to use the .getOrElse(“”) to handle missing values."
]
},
{
"cell_type": "code",
"execution_count": 108,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+--------+\n",
"| value|count(1)|\n",
"+----------------+--------+\n",
"| Sheev Palpatine| 1|\n",
"| Leia Skywalker| 1|\n",
"| Luke Skywalker| 1|\n",
"| Obi-Wan Kenobi| 3|\n",
"|Anakin Skywalker| 1|\n",
"| | 1|\n",
"+----------------+--------+\n",
"\n"
]
}
],
"source": [
"ds_missing.\n",
" map(x => x.friends).\n",
" flatMap(_.getOrElse(\"\").split(\", \")).\n",
" groupByKey(_.toString).\n",
" count().\n",
" show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Second solution\n",
"In the second solution I keep the name column from the original Dataset as well. Thus we will see the name – friend pairs for every friend referred in a new row. This could be useful in case of a more complex question (for exmple how many friends of the character have letter “S” in their names). We could also count the number of friends listed by each character and the number of times a character was referred as a friend from the same Dataset.\n",
"\n",
"To get the name – friend pair Dataset do the following steps:\n",
"\n",
"use map to select columns name and friends splitted at string “, “\n",
"use withcolumn to create a new column containig the exploded splitted friends. The explode creates a new row for every item in the splitted friend column. The first argument in the withColumn function is the name of the newly created column. If we write here _2, then we overwrite the splitted friend column.\n",
"Let’s see the code in action:"
]
},
{
"cell_type": "code",
"execution_count": 109,
"metadata": {},
"outputs": [],
"source": [
"import org.apache.spark.sql.functions.explode"
]
},
{
"cell_type": "code",
"execution_count": 110,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+--------------------+----------------+\n",
"| _1| _2| friend|\n",
"+----------------+--------------------+----------------+\n",
"|Anakin Skywalker| [Sheev Palpatine]| Sheev Palpatine|\n",
"| Luke Skywalker|[Han Solo, Leia S...| Han Solo|\n",
"| Luke Skywalker|[Han Solo, Leia S...| Leia Skywalker|\n",
"| Leia Skywalker| [Obi-Wan Kenobi]| Obi-Wan Kenobi|\n",
"| Obi-Wan Kenobi|[Yoda, Qui-Gon Jinn]| Yoda|\n",
"| Obi-Wan Kenobi|[Yoda, Qui-Gon Jinn]| Qui-Gon Jinn|\n",
"| Han Solo|[Leia Skywalker, ...| Leia Skywalker|\n",
"| Han Solo|[Leia Skywalker, ...| Luke Skywalker|\n",
"| Han Solo|[Leia Skywalker, ...| Obi-Wan Kenobi|\n",
"| Han Solo|[Leia Skywalker, ...| Chewbacca|\n",
"| Sheev Palpatine| [Anakin Skywalker]|Anakin Skywalker|\n",
"| R2-D2| [C-3PO]| C-3PO|\n",
"| C-3PO| [R2-D2]| R2-D2|\n",
"| Yoda| [Obi-Wan Kenobi]| Obi-Wan Kenobi|\n",
"| Darth Maul| [Sheev Palpatine]| Sheev Palpatine|\n",
"| Chewbacca| [Han Solo]| Han Solo|\n",
"| Jabba| [Boba Fett]| Boba Fett|\n",
"|Lando Calrissian| [Han Solo]| Han Solo|\n",
"+----------------+--------------------+----------------+\n",
"\n"
]
}
],
"source": [
"sw_ds.\n",
" map(x => (x.name, x.friends.split(\", \")) ).\n",
" withColumn(\"friend\", explode($\"_2\")).\n",
" show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the example above there is a _2 column containing the result of the split and a third friend column for every item in the _2 columns. In the next example I use _2 as the name of the new column, thus overwrite the splitted friend column, and rename the columns. Then a new case class is defined an the result is converted to Dataset. We will do more transfromation on this Dataset."
]
},
{
"cell_type": "code",
"execution_count": 111,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"defined class NameFriend\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"case class NameFriend(name: String, friend: String)\n",
"org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)"
]
},
{
"cell_type": "code",
"execution_count": 112,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"NameFriend_df = [name: string, friend: string]\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[name: string, friend: string]"
]
},
"execution_count": 112,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val NameFriend_df =sw_ds.\n",
" map(x => (x.name, x.friends.split(\", \")) ).\n",
" withColumn(\"_2\", explode($\"_2\")).\n",
" toDF(Seq(\"name\", \"friend\"): _*)"
]
},
{
"cell_type": "code",
"execution_count": 113,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"NameFriend_ds = [name: string, friend: string]\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[name: string, friend: string]"
]
},
"execution_count": 113,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val NameFriend_ds = NameFriend_df.as[NameFriend]"
]
},
{
"cell_type": "code",
"execution_count": 114,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+----------------+\n",
"| name| friend|\n",
"+----------------+----------------+\n",
"|Anakin Skywalker| Sheev Palpatine|\n",
"| Luke Skywalker| Han Solo|\n",
"| Luke Skywalker| Leia Skywalker|\n",
"| Leia Skywalker| Obi-Wan Kenobi|\n",
"| Obi-Wan Kenobi| Yoda|\n",
"| Obi-Wan Kenobi| Qui-Gon Jinn|\n",
"| Han Solo| Leia Skywalker|\n",
"| Han Solo| Luke Skywalker|\n",
"| Han Solo| Obi-Wan Kenobi|\n",
"| Han Solo| Chewbacca|\n",
"| Sheev Palpatine|Anakin Skywalker|\n",
"| R2-D2| C-3PO|\n",
"| C-3PO| R2-D2|\n",
"| Yoda| Obi-Wan Kenobi|\n",
"| Darth Maul| Sheev Palpatine|\n",
"| Chewbacca| Han Solo|\n",
"| Jabba| Boba Fett|\n",
"|Lando Calrissian| Han Solo|\n",
"+----------------+----------------+\n",
"\n"
]
}
],
"source": [
"NameFriend_ds.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally we will answer three different questions using the NameFriend_ds Dataset:\n",
"\n",
"A. How many times the characters were referred as a friend?\n",
"\n",
"Solution:\n",
"\n",
"- groupByKey where the key is the splitted and exploded referred friends column\n",
"- count – calculate the number of occurrence of refrerred friends\n",
"- orderBy – sort the values by decreasing popularity"
]
},
{
"cell_type": "code",
"execution_count": 115,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+--------+\n",
"| value|count(1)|\n",
"+----------------+--------+\n",
"| Han Solo| 3|\n",
"| Obi-Wan Kenobi| 3|\n",
"| Leia Skywalker| 2|\n",
"| Sheev Palpatine| 2|\n",
"| C-3PO| 1|\n",
"|Anakin Skywalker| 1|\n",
"| Yoda| 1|\n",
"| Luke Skywalker| 1|\n",
"| Chewbacca| 1|\n",
"| R2-D2| 1|\n",
"| Qui-Gon Jinn| 1|\n",
"| Boba Fett| 1|\n",
"+----------------+--------+\n",
"\n"
]
}
],
"source": [
"//A.\n",
"NameFriend_ds.\n",
" groupByKey(_.friend).\n",
" count().\n",
" orderBy($\"count(1)\".desc).\n",
" show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Han Solo and Obi-Wan Kenobi were the most popular, they were mentioned by 3 other charates as their friends.\n",
"\n",
"B. How many friends were listed by the characters?\n",
"\n",
"Solution:\n",
"\n",
"- groupByKey where the key is name of the characters\n",
"- count – calculate the number of occurrence of a name\n",
"- orderBy – sort the values by decreasing number of listed friends"
]
},
{
"cell_type": "code",
"execution_count": 116,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+--------+\n",
"| value|count(1)|\n",
"+----------------+--------+\n",
"| Han Solo| 4|\n",
"| Luke Skywalker| 2|\n",
"| Obi-Wan Kenobi| 2|\n",
"| C-3PO| 1|\n",
"| R2-D2| 1|\n",
"| Leia Skywalker| 1|\n",
"| Sheev Palpatine| 1|\n",
"| Chewbacca| 1|\n",
"| Jabba| 1|\n",
"|Anakin Skywalker| 1|\n",
"| Yoda| 1|\n",
"| Darth Maul| 1|\n",
"|Lando Calrissian| 1|\n",
"+----------------+--------+\n",
"\n"
]
}
],
"source": [
"//B.\n",
"NameFriend_ds.\n",
" groupByKey(_.name).\n",
" count().\n",
" orderBy($\"count(1)\".desc).\n",
" show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Han Solo listed 4 friends, Luke listed 2, etc.\n",
"\n",
"C. How many friends were listed with letter “S” in their names by the characters?\n",
"\n",
"Solution:\n",
"\n",
"- create a case class containing a new Integer column\n",
"- withColumn – add the new column with the position of letter “S”\n",
"- convert the result into Dataset\n",
"- filter rows where the position of “S” is greater than 0 (the remaining rows contain friend with letter “S”)\n",
"- groupByKey – where the key is name of the characters\n",
"- count the number of rows by characters in the filtered Dataset\n",
"- orderBy – sort the values by decreasing number of friends with letter “S” in their names"
]
},
{
"cell_type": "code",
"execution_count": 117,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"defined class NameFriendS_ds\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"case class NameFriendS_ds(name: String, friend: String, S_in_friend:Integer)\n",
"org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)"
]
},
{
"cell_type": "code",
"execution_count": 118,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----------------+--------+\n",
"| value|count(1)|\n",
"+----------------+--------+\n",
"| Luke Skywalker| 2|\n",
"| Han Solo| 2|\n",
"| Darth Maul| 1|\n",
"|Anakin Skywalker| 1|\n",
"|Lando Calrissian| 1|\n",
"| Sheev Palpatine| 1|\n",
"| Chewbacca| 1|\n",
"+----------------+--------+\n",
"\n"
]
}
],
"source": [
"//C.\n",
"NameFriend_ds.\n",
" withColumn(\"S_in_friend\", locate(\"S\", (NameFriend_ds(\"friend\"))) ).\n",
" as[NameFriendS_ds].\n",
" filter(x=>x.S_in_friend>0).\n",
" groupByKey(_.name).\n",
" count().\n",
" orderBy($\"count(1)\".desc).\n",
" show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can see for example that Han Solo and Luke Skywalker have two friends `mwhose name contain letter “S”. Characters not listed in the output have no friends with letter “S”."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Apache Toree - Scala",
"language": "scala",
"name": "apache_toree_scala"
},
"language_info": {
"codemirror_mode": "text/x-scala",
"file_extension": ".scala",
"mimetype": "text/x-scala",
"name": "scala",
"pygments_lexer": "scala",
"version": "2.11.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
@Mageswaran1989
Copy link
Author

StarWars.csv

"name";"height";"weight";"eyecolor";"haircolor";"jedi";"species"
"Anakin Skywalker";188;84;"blue";"blond";"jedi";"human"
"Padme Amidala";165;45;"brown";"brown";"no_jedi";"human"
"Luke Skywalker";172;77;"blue";"blond";"jedi";"human"
"Leia Skywalker";150;49;"brown";"brown";"no_jedi";"human"
"Qui-Gon Jinn";193;89;"blue";"brown";"jedi";"human"
"Obi-Wan Kenobi";182;77;"bluegray";"auburn";"jedi";"human"
"Han Solo";180;80;"brown";"brown";"no_jedi";"human"
"Sheev Palpatine";173;75;"blue";"red";"no_jedi";"human"
"R2-D2";96;32;;;"no_jedi";"droid"
"C-3PO";167;75;;;"no_jedi";"droid"
"Yoda";66;17;"brown";"brown";"jedi";"yoda"
"Darth Maul";175;80;"yellow";"none";"no_jedi";"dathomirian"
"Dooku";193;86;"brown";"brown";"jedi";"human"
"Chewbacca";228;112;"blue";"brown";"no_jedi";"wookiee"
"Jabba";390;;"yellow";"none";"no_jedi";"hutt"
"Lando Calrissian";178;79;"brown";"blank";"no_jedi";"human"
"Boba Fett";183;78;"brown";"black";"no_jedi";"human"
"Jango Fett";183;79;"brown";"black";"no_jedi";"human"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment