Skip to content

Instantly share code, notes, and snippets.

@ludwiggj
Created October 31, 2019 22:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ludwiggj/1fc3ac09ca698e22143e824c683e2394 to your computer and use it in GitHub Desktop.
Save ludwiggj/1fc3ac09ca698e22143e824c683e2394 to your computer and use it in GitHub Desktop.
Strange Spark dataframe behaviour when reading in data from file
...
Loading data from file:
Input sales data:
+------+--------+-----+-------------------------------------------------+
|shopId|game |sales|_corrupt_record |
+------+--------+-----+-------------------------------------------------+
|1 |Monopoly|60 |null |
|1 |Cleudo |25 |null |
|2 |Monopoly|40 |null |
|null |null |null |{"shopId": "err", "game": "Cleudo", "sales": 75}|
+------+--------+-----+-------------------------------------------------+
Valid sales:
+------+--------+-----+---------------+
|shopId|game |sales|_corrupt_record|
+------+--------+-----+---------------+
|1 |Monopoly|60 |null |
|1 |Cleudo |25 |null |
|2 |Monopoly|40 |null |
+------+--------+-----+---------------+
Sales report (incorrect):
+--------+-----------+----------+
|game |count(game)|salesTotal|
+--------+-----------+----------+
|Cleudo |2 |100 |
|Monopoly|2 |100 |
+--------+-----------+----------+
Sales report (take 2):
+--------+-----------+----------+----------------------------------------+
|game |count(game)|salesTotal|allRecords |
+--------+-----------+----------+----------------------------------------+
|Cleudo |1 |25 |[[1, Cleudo, 25,]] |
|Monopoly|2 |100 |[[1, Monopoly, 60,], [2, Monopoly, 40,]]|
+--------+-----------+----------+----------------------------------------+
Translate to a DataSet (incorrect):
GameSales(Cleudo,100)
GameSales(Monopoly,100)
Process allRecords directly (correct result)
+--------+----------+
|game |salesTotal|
+--------+----------+
|Cleudo |25 |
|Monopoly|100 |
+--------+----------+
All in one report:
+--------+----------+
|game |salesTotal|
+--------+----------+
|Cleudo |25 |
|Monopoly|100 |
+--------+----------+
Process finished with exit code 0
{"shopId": 1, "game": "Monopoly", "sales": 60}
{"shopId": 1, "game": "Cleudo", "sales": 25}
{"shopId": 2, "game": "Monopoly", "sales": 40}
{"shopId": "err", "game": "Cleudo", "sales": 75}
package org.ludwiggj
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
case class GameSales(game: String, salesTotal: Long)
object SparkDataframeDebug {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession
.builder()
.appName("SparkDataframeDebug")
.config("spark.master", "local")
.getOrCreate()
val salesSchema = StructType(Seq(
StructField("shopId", LongType, nullable = false),
StructField("game", StringType, nullable = false),
StructField("sales", LongType, nullable = false),
StructField("_corrupt_record", StringType)
))
// Loading data from file
println("Loading data from file:")
val filePath =
s"${System.getProperty("user.dir")}/testFiles"
val inputDataDF = sparkSession.read
.schema(salesSchema)
.option("mode", "PERMISSIVE")
.json(filePath)
println("Input sales data:")
inputDataDF.show(truncate = false)
import org.apache.spark.sql.functions._
val validSales = inputDataDF.filter(col("_corrupt_record").isNull)
println("Valid sales:")
validSales.show(truncate = false)
println("Sales report (incorrect):")
val incorrectReportDF = validSales.groupBy("game")
.agg(
count(col("game")),
sum(col("sales")) as "salesTotal"
).sort("game")
incorrectReportDF.show(truncate = false)
println("Sales report (take 2):")
val reportDF = validSales.groupBy("game")
.agg(
count(col("game")),
sum(col("sales")) as "salesTotal",
collect_list(struct("*")).as("allRecords")
).sort("game")
reportDF.show(truncate = false)
println("Translate to a DataSet (incorrect):\n")
import sparkSession.implicits._
val incorrectReportDS = reportDF.as[GameSales]
incorrectReportDS.foreach(println(_))
println("\nProcess allRecords directly (correct result)")
import org.apache.spark.sql.functions._
val finalReport = reportDF
.select(explode($"allRecords"))
.select($"col.game", $"col.sales")
.groupBy("game")
.agg(
sum(col("sales")) as "salesTotal"
)
.sort("game")
finalReport.show(truncate = false)
println("All in one report:")
val allInOneReport = validSales.groupBy("game")
.agg(
collect_list(struct("*")).as("allRecords")
)
.select(explode($"allRecords"))
.select($"col.game", $"col.sales")
.groupBy("game")
.agg(
sum(col("sales")) as "salesTotal"
)
.sort("game")
allInOneReport.show(truncate = false)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment