Skip to content

Instantly share code, notes, and snippets.

@avcaliani
Last active September 21, 2020 11:39
Show Gist options
  • Save avcaliani/cab10b8a565dffc3cd83053e26c49cb2 to your computer and use it in GitHub Desktop.
Save avcaliani/cab10b8a565dffc3cd83053e26c49cb2 to your computer and use it in GitHub Desktop.
#code #apache-spark #partitioned-parquet

🧩 Partitioned Parquet

By Anthony Vilarim Caliani

# # # #

This is an example of working with Partitioned Parquet, here you will find how to read and write partitioned parquet files.
In this example I'm using a Netflix Shows dataset, so thanks to Shivam Bansal for sharing his dataset.

The important thing here is the code, but if you want to execute it there is a run.sh to help you out.

# First, run ingestion script...
bash run.sh ingest

# And then you can read some data...
bash run.sh read         # Read all data
bash run.sh read 2009    # Read all data from "2009"
bash run.sh read 2009 5  # Read all data from "May 2009"

Output

Partitioned by default

data/netflix/shows-default.parquet
β”œβ”€β”€ _SUCCESS
└── part-00000-341edcc1-f245-46d7-85f9-9f54a8b862ac-c000.snappy.parquet

Partitioned by Year and Month

data/netflix/shows.parquet
β”œβ”€β”€ _SUCCESS
β”œβ”€β”€ release_year=2008
β”‚Β Β  β”œβ”€β”€ release_month=1
β”‚Β Β  β”‚Β Β  └── part-00000-b60d3cb1-629f-4034-bebc-c75e0341e1b4.c000.snappy.parquet
β”‚Β Β  └── release_month=2
β”‚Β Β      └── part-00000-b60d3cb1-629f-4034-bebc-c75e0341e1b4.c000.snappy.parquet
└── release_year=2009
 Β Β  β”œβ”€β”€ release_month=11
 Β Β  β”‚Β Β  └── part-00000-b60d3cb1-629f-4034-bebc-c75e0341e1b4.c000.snappy.parquet
 Β Β  └── release_month=5
 Β Β      └── part-00000-b60d3cb1-629f-4034-bebc-c75e0341e1b4.c000.snappy.parquet

Further Help

If you want to execute this code locally you have to download the dataset from Kaggle and then add the file into the folder ./data/.

import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
def read(spark: SparkSession, file: String): DataFrame = {
println(s"Reading file '$file'...")
spark.read
.option("header", "true")
.option("delimiter", ",")
.csv(file)
}
def save(df: DataFrame, path: String): Unit = {
println(s"Writing data to '$path'...")
df.write
.partitionBy("release_year", "release_month")
.mode("append")
.parquet(path)
}
def prepare(df: DataFrame): DataFrame = {
df.withColumn("date_added", to_date($"date_added", "MMMM d, yyyy"))
.withColumn("release_year" , year($"date_added"))
.withColumn("release_month", month($"date_added"))
}
// ______ __ __ __ __
// /\ == \ /\ \/\ \ /\ "-.\ \
// \ \ __< \ \ \_\ \ \ \ \-. \
// \ \_\ \_\ \ \_____\ \ \_\\"\_\
// \/_/ /_/ \/_____/ \/_/ \/_/
val BUCKET = "./data"
val shows = read(spark, s"$BUCKET/netflix_titles.csv")
println(s"This file has ${shows.count()} records.")
shows.printSchema()
save(prepare(shows), s"$BUCKET/netflix/shows.parquet")
println("That's all folks o/")
sys.exit(0)
import org.apache.spark.sql.{SparkSession, DataFrame}
def read(spark: SparkSession, path: String, partitions: Map[String, String]): DataFrame = {
val part = partitions
.filter(_._2 != "")
.map(kv => s"${kv._1}=${kv._2}")
.reduceOption((part, nextPart) => s"${part}/${nextPart}")
.getOrElse("")
val file = s"$path/$part"
println(s"Reading file '$file'...")
spark.read.parquet(file)
}
// ______ __ __ __ __
// /\ == \ /\ \/\ \ /\ "-.\ \
// \ \ __< \ \ \_\ \ \ \ \-. \
// \ \_\ \_\ \ \_____\ \ \_\\"\_\
// \/_/ /_/ \/_____/ \/_/ \/_/
val BUCKET = "./data/netflix"
val RELEASE_YEAR = sys.env.getOrElse("RELEASE_YEAR", "")
val RELEASE_MONTH = sys.env.getOrElse("RELEASE_MONTH", "")
val shows = read(spark, s"$BUCKET/shows.parquet", Map(
"release_year" -> RELEASE_YEAR,
"release_month" -> RELEASE_MONTH
))
shows.printSchema()
shows.show()
println(s"This file has ${shows.count()} records.")
println("That's all folks o/")
sys.exit(0)
#!/bin/bash
# @script run.sh
# @author Anthony Vilarim Caliani
# @contact github.com/avcaliani
#
# @description
# Script to execute Spark Shell jobs...
#
# @params
# 01 - Script Name (ingestion or read)
# 02 - Release Year
# 03 - Release Month
#
# @usage
# ./run.sh ingestion
# ./run.sh read 2009 5
#
echo '
_ _ _ __ _ _
| \| |___| |_ / _| (_)_ __
| .` / -_) _| _| | \ \ /
|_|\_\___|\__|_| |_|_/_\_\'
echo -e "\033[1;31m$(echo $1 | tr '[:lower:]' '[:upper:]') PROCESS\033[00m\n"
RELEASE_YEAR="$2" RELEASE_MONTH="$3" spark-shell -I "$1.scala" && exit 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment