Last active
November 3, 2016 10:52
-
-
Save aniketbhatnagar/2ed74613f70d2defe999c18afaa4816e to your computer and use it in GitHub Desktop.
Spark 2.0.1 Data Set Bug for Option
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.SparkContext | |
import org.apache.spark.sql.{Dataset, SQLContext, SparkSession} | |
object DataSetOptBug { | |
def main(args: Array[String]): Unit = { | |
if (System.getProperty("spark.master") == null) System.setProperty("spark.master", "local[*]") | |
val sparkSession: SparkSession = SparkSession.builder.appName("AudScale Context").getOrCreate | |
val sqlContext: SQLContext = sparkSession.sqlContext | |
val sparkContext: SparkContext = sparkSession.sparkContext | |
import sqlContext.implicits._ | |
val data: Dataset[Long] = sparkContext.range(1, 1000).toDS | |
val count = data.map(i => if (i < 500) { | |
Some(DataRow(i.toInt, s"i$i")) | |
} else { | |
Option.empty | |
}).filter(_.isEmpty) | |
.map(_.get) | |
.count() | |
println(s"count = $count") | |
} | |
case class DataRow(id: Int, value: String) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 0.0 failed 1 times, most recent failure: Lost task 6.0 in stage 0.0 (TID 6, localhost): java.lang.RuntimeException: Null value appeared in non-nullable field: | |
- field (class: "scala.Int", name: "id") | |
- option value class: "DataSetOptBug.DataRow" | |
- root class: "scala.Option" | |
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int). | |
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source) | |
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) | |
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) | |
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) | |
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) | |
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) | |
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) | |
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) | |
at org.apache.spark.scheduler.Task.run(Task.scala:86) | |
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) | |
at java.lang.Thread.run(Thread.java:745) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment