Skip to content

Instantly share code, notes, and snippets.

@TomLous
Last active April 5, 2021 19:41
Show Gist options
  • Save TomLous/094d4bc08b9c67e1876e92775e37b6c0 to your computer and use it in GitHub Desktop.
Save TomLous/094d4bc08b9c67e1876e92775e37b6c0 to your computer and use it in GitHub Desktop.
Basic Spark Job for Medium Article
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import xyz.graphiq.model._
object BasicSparkJob extends App {
val spark: SparkSession = SparkSession
.builder()
.config(
new SparkConf().setIfMissing("master", "local[*]")
)
.appName("BasicSparkJob")
.getOrCreate()
import spark.implicits._
// define some paths
val inputPath = args(0)
val outputPath = args(1)
// define movie dataset
val movieSchema = Encoders.product[Movie].schema
val moviesDataset = spark
.read
.option("header", true)
.schema(movieSchema)
.csv(s"$inputPath/movies.csv")
.as[Movie]
// define rating dataset
val ratingSchema = Encoders.product[Rating].schema
val ratingDataset = spark
.read
.option("header", true)
.schema(ratingSchema)
.csv(s"$inputPath/ratings.csv")
.as[Rating]
// combine and group and save
ratingDataset
.join(broadcast(moviesDataset), "movieId")
.groupBy('movieId)
.agg(
first('title).as("title"),
round(avg('rating), 2).as("averageRating"),
count('rating).as("numberOfRatings")
)
.select('movieId, 'title, 'averageRating, 'numberOfRatings)
.as[MovieRating]
.repartition(20)
.write
.mode(SaveMode.Overwrite)
.parquet(s"$outputPath/movie-ratings")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment