Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Spark 2.0.1 Data Set Bug for Option
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataset, SQLContext, SparkSession}
object DataSetOptBug {
def main(args: Array[String]): Unit = {
if (System.getProperty("spark.master") == null) System.setProperty("spark.master", "local[*]")
val sparkSession: SparkSession = SparkSession.builder.appName("AudScale Context").getOrCreate
val sqlContext: SQLContext = sparkSession.sqlContext
val sparkContext: SparkContext = sparkSession.sparkContext
import sqlContext.implicits._
val data: Dataset[Long] = sparkContext.range(1, 1000).toDS
val count = data.map(i => if (i < 500) {
Some(DataRow(i.toInt, s"i$i"))
} else {
Option.empty
}).filter(_.isEmpty)
.map(_.get)
.count()
println(s"count = $count")
}
case class DataRow(id: Int, value: String)
}
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 0.0 failed 1 times, most recent failure: Lost task 6.0 in stage 0.0 (TID 6, localhost): java.lang.RuntimeException: Null value appeared in non-nullable field:
- field (class: "scala.Int", name: "id")
- option value class: "DataSetOptBug.DataRow"
- root class: "scala.Option"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment