Skip to content

Instantly share code, notes, and snippets.

@avcaliani
Last active May 2, 2020 03:12
Show Gist options
  • Save avcaliani/339b4088e4ebf834ca769ce62f3972dc to your computer and use it in GitHub Desktop.
Save avcaliani/339b4088e4ebf834ca769ce62f3972dc to your computer and use it in GitHub Desktop.
#code #apache-spark #spark-tables

🗂 Spark Tables

By Anthony Vilarim Caliani

# # # #

This is an example of working with Spark Tables.
In this example I'm using a Superbowl dataset, so thanks to Timo Bozsolik for sharing his dataset.

The important thing here is the code, but if you want to execute it there is a Makefile to help you out.

# Running...
make clean run

Further Help

If you want to execute this code locally you have to download the dataset from Kaggle and then you can split the dataset in two files superbowl-xix.csv and superbowl-xx.csv, finally add these two files into the folder ./data/.

import org.apache.spark.sql.{SparkSession, DataFrame}
// ___ _ _
// / __|___ _ _ __| |_ __ _ _ _| |_ ___
// | (__/ _ \ ' \(_-< _/ _` | ' \ _(_-<
// \___\___/_||_/__/\__\__,_|_||_\__/__/
val DB_NAME = "DB"
val TABLE_NAME = "SUPERBOWL"
// ___ _ _
// | __| _ _ _ __| |_(_)___ _ _ ___
// | _| || | ' \/ _| _| / _ \ ' \(_-<
// |_| \_,_|_||_\__|\__|_\___/_||_/__/
def green(s: String): String = Console.GREEN + s + Console.RESET
def blue(s: String): String = Console.BLUE + s + Console.RESET
def cyan(s: String): String = Console.CYAN + s + Console.RESET
def read(spark: SparkSession, file: String): DataFrame = {
println(s"Reading file '$file'... Here it is a sample...")
spark.read
.option("header", "true")
.option("delimiter", ",")
.csv(s"data/$file")
.select(
$"Date" .as("DATE"),
$"SB" .as("SB_EDITION"),
$"Winner" .as("WINNER"),
$"Winner Pts".as("WINNER_PTS"),
$"Loser" .as("LOSER"),
$"Loser Pts" .as("LOSER_PTS"),
$"MVP" .as("MVP"),
$"Stadium" .as("STADIUM"),
$"City" .as("CITY"),
$"State" .as("STATE")
)
}
def save(spark: SparkSession, df: DataFrame, firstInput: Boolean = false): Unit = {
df.createOrReplaceTempView(s"TMP_$TABLE_NAME")
if (firstInput) {
spark.sql(s"CREATE DATABASE IF NOT EXISTS $DB_NAME")
spark.sql(s"DROP TABLE IF EXISTS $DB_NAME.$TABLE_NAME");
spark.sql(s"CREATE TABLE $DB_NAME.$TABLE_NAME AS SELECT * FROM TMP_$TABLE_NAME")
}
else
spark.sql(s"INSERT INTO TABLE $DB_NAME.$TABLE_NAME select * from TMP_$TABLE_NAME")
val records = spark.sql(s"SELECT COUNT(*) FROM $DB_NAME.$TABLE_NAME").head.getLong(0).toString
println(s"Table '${blue(DB_NAME)}.${cyan(TABLE_NAME)}' has '${green(records)}' records :)")
}
// ______ __ __ __ __
// /\ == \ /\ \/\ \ /\ "-.\ \
// \ \ __< \ \ \_\ \ \ \ \-. \
// \ \_\ \_\ \ \_____\ \ \_\\"\_\
// \/_/ /_/ \/_____/ \/_/ \/_/
val df19 = read(spark, "superbowl-xix.csv")
df19.show(2)
// Creating table and inserting our data...
save(spark, df19, firstInput = true)
val df20 = read(spark, "superbowl-xx.csv")
df20.show(2)
// Now we can add more data...
save(spark, df20)
// Before we finish...
val df = spark.table(s"$DB_NAME.$TABLE_NAME")
df.show()
println("That's all folks o/")
sys.exit(0)
.DEFAULT_GOAL := help
help:
@echo ""
@echo "Available Tasks"
@echo "---------------"
@echo "clean Remove generated files and folders."
@echo "run Execute 'main.scala' on Spark Shell."
@echo ""
clean:
rm -rf metastore_db spark-warehouse derby.log || true
run:
@echo "Running on Spark Shell..."
spark-shell -I "main.scala"
.PHONY: help clean run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment