Skip to content

Instantly share code, notes, and snippets.

@dixitm20
Last active July 18, 2024 10:30
Show Gist options
  • Save dixitm20/2feb1d8df0ab6e0ac2cb3f08f7de5962 to your computer and use it in GitHub Desktop.
Save dixitm20/2feb1d8df0ab6e0ac2cb3f08f7de5962 to your computer and use it in GitHub Desktop.
Assignment For Data Engineer

Spark: Scala / PySpark Exercise

Create a spark application written in Scala or PySpark that reads in the provided signals dataset, processes the data, and stores the entire output as specified below.

For each entity_id in the signals dataset, find the item_id with the oldest and newest month_id.In some cases it may be the same item. If there are 2 different items with the same month_id then take the item with the lower item_id. Finally sum the count of signals for each entity and output as the total_signals. The correct output should contain 1 row per unique entity_id.

Requirements:

  1. Create a Scala SBT project Or Pyspark Project (If you know scala then please use the same as we give higher preference to that).
  2. Use the Spark Scala/Pyspark API and Dataframes/Datasets
    • Please do not use Spark SQL with a sql string!
    • If you write spark.sql( “select ….”) you are doing it wrong!!
  3. Produce a single Parquet output file, regardless of parallelism during processing
  4. Hint: Use window analytics functions to compute final output in a single query

Input:

entity_id: long
item_id: integer
source: integer
month_id: integer
signal_count: integer

Output:

entity_id: long
oldest_item_id: integer
newest_item_id: integer
total_signals: integer

Example partial output:

+----------+--------------+--------------+-------------+
| entity_id|oldest_item_id|newest_item_id|total_signals|
+----------+--------------+--------------+-------------+
| 190979372|             2|             1|           20|
| 220897278|             2|             1|           66|
|1146753107|             2|             0|           22|
| 224619015|             0|             3|           12|
| 118083189|             5|             3|          234|
|    162371|             4|             2|           29|
|    555304|             0|             2|            9|
| 118634684|             2|             3|           17|
| 213956643|         10000|             1|           17|

Submission Guidelines:

Please submit the entire Scala SBT / PySpark project with all code necessary to build and run the app. You can bundle it as a zip, tarball or github link. Also include a copy of the output file that is generated from running the app.

@iamyashsingh
Copy link

package pack

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.{ col, lit, rand }
import org.apache.spark.sql.expressions.Window

object obj {

def main(args: Array[String]): Unit = {

System.setProperty("hadoop.home.dir", "C://data/hadoop")

val conf = new SparkConf().setAppName("first").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val data = spark.read
  .parquet("file:///c:/data/data.parquet") // Adjust the path as needed

// Define the window specification
val windowSpec = Window.partitionBy("entity_id")

// Compute oldest and newest item_id
val resultDF = data
  .withColumn("oldest_item_id", first("item_id").over(windowSpec.orderBy(col("month_id"), col("item_id"))))
  .withColumn("newest_item_id", last("item_id").over(windowSpec.orderBy(col("month_id").desc, col("item_id"))))
  .groupBy("entity_id")
  .agg(
    sum("count").as("total_signals"),
    first("oldest_item_id").as("oldest_item_id"),
    first("newest_item_id").as("newest_item_id")
  )

// Write to Parquet
resultDF.write.csv("file:///c:/data/output.csv")

}
}

@iamyashsingh
Copy link

package pack

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.{ col, lit, rand }
import org.apache.spark.sql.expressions.Window

object obj {

def main(args: Array[String]): Unit = {

System.setProperty("hadoop.home.dir", "C://data/hadoop")

val conf = new SparkConf().setAppName("first").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val data = spark.read
  .parquet("file:///c:/data/data.parquet") // Adjust the path as needed

// Define the window specification
val windowSpec = Window.partitionBy("entity_id")

// Compute oldest and newest item_id
val resultDF = data
  .withColumn("oldest_item_id", first("item_id").over(windowSpec.orderBy(col("month_id"), col("item_id"))))
  .withColumn("newest_item_id", last("item_id").over(windowSpec.orderBy(col("month_id").desc, col("item_id"))))
  .groupBy("entity_id")
  .agg(
    sum("count").as("total_signals"),
    first("oldest_item_id").as("oldest_item_id"),
    first("newest_item_id").as("newest_item_id")
  )

// Write to Parquet
resultDF.write.csv("file:///c:/data/output.csv")

}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment