Created
October 31, 2019 22:48
-
-
Save ludwiggj/1fc3ac09ca698e22143e824c683e2394 to your computer and use it in GitHub Desktop.
Strange Spark dataframe behaviour when reading in data from file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
Loading data from file: | |
Input sales data: | |
+------+--------+-----+-------------------------------------------------+ | |
|shopId|game |sales|_corrupt_record | | |
+------+--------+-----+-------------------------------------------------+ | |
|1 |Monopoly|60 |null | | |
|1 |Cleudo |25 |null | | |
|2 |Monopoly|40 |null | | |
|null |null |null |{"shopId": "err", "game": "Cleudo", "sales": 75}| | |
+------+--------+-----+-------------------------------------------------+ | |
Valid sales: | |
+------+--------+-----+---------------+ | |
|shopId|game |sales|_corrupt_record| | |
+------+--------+-----+---------------+ | |
|1 |Monopoly|60 |null | | |
|1 |Cleudo |25 |null | | |
|2 |Monopoly|40 |null | | |
+------+--------+-----+---------------+ | |
Sales report (incorrect): | |
+--------+-----------+----------+ | |
|game |count(game)|salesTotal| | |
+--------+-----------+----------+ | |
|Cleudo |2 |100 | | |
|Monopoly|2 |100 | | |
+--------+-----------+----------+ | |
Sales report (take 2): | |
+--------+-----------+----------+----------------------------------------+ | |
|game |count(game)|salesTotal|allRecords | | |
+--------+-----------+----------+----------------------------------------+ | |
|Cleudo |1 |25 |[[1, Cleudo, 25,]] | | |
|Monopoly|2 |100 |[[1, Monopoly, 60,], [2, Monopoly, 40,]]| | |
+--------+-----------+----------+----------------------------------------+ | |
Translate to a DataSet (incorrect): | |
GameSales(Cleudo,100) | |
GameSales(Monopoly,100) | |
Process allRecords directly (correct result) | |
+--------+----------+ | |
|game |salesTotal| | |
+--------+----------+ | |
|Cleudo |25 | | |
|Monopoly|100 | | |
+--------+----------+ | |
All in one report: | |
+--------+----------+ | |
|game |salesTotal| | |
+--------+----------+ | |
|Cleudo |25 | | |
|Monopoly|100 | | |
+--------+----------+ | |
Process finished with exit code 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"shopId": 1, "game": "Monopoly", "sales": 60} | |
{"shopId": 1, "game": "Cleudo", "sales": 25} | |
{"shopId": 2, "game": "Monopoly", "sales": 40} | |
{"shopId": "err", "game": "Cleudo", "sales": 75} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.ludwiggj | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.types._ | |
case class GameSales(game: String, salesTotal: Long) | |
object SparkDataframeDebug { | |
def main(args: Array[String]): Unit = { | |
val sparkSession = SparkSession | |
.builder() | |
.appName("SparkDataframeDebug") | |
.config("spark.master", "local") | |
.getOrCreate() | |
val salesSchema = StructType(Seq( | |
StructField("shopId", LongType, nullable = false), | |
StructField("game", StringType, nullable = false), | |
StructField("sales", LongType, nullable = false), | |
StructField("_corrupt_record", StringType) | |
)) | |
// Loading data from file | |
println("Loading data from file:") | |
val filePath = | |
s"${System.getProperty("user.dir")}/testFiles" | |
val inputDataDF = sparkSession.read | |
.schema(salesSchema) | |
.option("mode", "PERMISSIVE") | |
.json(filePath) | |
println("Input sales data:") | |
inputDataDF.show(truncate = false) | |
import org.apache.spark.sql.functions._ | |
val validSales = inputDataDF.filter(col("_corrupt_record").isNull) | |
println("Valid sales:") | |
validSales.show(truncate = false) | |
println("Sales report (incorrect):") | |
val incorrectReportDF = validSales.groupBy("game") | |
.agg( | |
count(col("game")), | |
sum(col("sales")) as "salesTotal" | |
).sort("game") | |
incorrectReportDF.show(truncate = false) | |
println("Sales report (take 2):") | |
val reportDF = validSales.groupBy("game") | |
.agg( | |
count(col("game")), | |
sum(col("sales")) as "salesTotal", | |
collect_list(struct("*")).as("allRecords") | |
).sort("game") | |
reportDF.show(truncate = false) | |
println("Translate to a DataSet (incorrect):\n") | |
import sparkSession.implicits._ | |
val incorrectReportDS = reportDF.as[GameSales] | |
incorrectReportDS.foreach(println(_)) | |
println("\nProcess allRecords directly (correct result)") | |
import org.apache.spark.sql.functions._ | |
val finalReport = reportDF | |
.select(explode($"allRecords")) | |
.select($"col.game", $"col.sales") | |
.groupBy("game") | |
.agg( | |
sum(col("sales")) as "salesTotal" | |
) | |
.sort("game") | |
finalReport.show(truncate = false) | |
println("All in one report:") | |
val allInOneReport = validSales.groupBy("game") | |
.agg( | |
collect_list(struct("*")).as("allRecords") | |
) | |
.select(explode($"allRecords")) | |
.select($"col.game", $"col.sales") | |
.groupBy("game") | |
.agg( | |
sum(col("sales")) as "salesTotal" | |
) | |
.sort("game") | |
allInOneReport.show(truncate = false) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment