Created
July 1, 2017 20:46
-
-
Save tsaastam/7fdccef8fd8a80f84783740ffd9ecdb9 to your computer and use it in GitHub Desktop.
Spark Dataset union & column order
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// illustration of https://issues.apache.org/jira/browse/SPARK-21109 | |
// see also https://lobotomys.blogspot.co.uk/2017/07/spark-union-column-order-issue.html | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.sql.Dataset | |
import org.apache.spark.sql.SQLContext | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions._ | |
// if using spark-shell, skip the next 4 lines | |
val spark = SparkSession.builder().master("local").appName("my-spark-shell").getOrCreate() | |
import spark.implicits._ | |
val sc = spark.sparkContext | |
val sqlContext = new SQLContext(sc) | |
// the beef | |
case class Thing(id: String, count: Long, name: String) | |
val things1: Dataset[Thing] = sc.parallelize(Seq( | |
Thing("thing1", 123, "some_thing"), | |
Thing("thing2", 101, "another_thing"), | |
Thing("thing2", 100, "another_thing") | |
)).toDS | |
val things2: Dataset[Thing] = sc.parallelize(Seq( | |
Thing("foo", 5, "different_thing"), | |
Thing("foo", 15, "different_thing"), | |
Thing("bar", 6, "whatever_thing") | |
)).toDS | |
// works as expected | |
things1.union(things2).show | |
val agg1: Dataset[Thing] = things1.groupBy($"id", $"name").agg(sum("count").as("count")).as[Thing] | |
val agg2: Dataset[Thing] = things2.groupBy($"id", $"name").agg(sum("count").as("count")).as[Thing] | |
// works as expected - but note the different column order! | |
agg1.show | |
agg1.union(agg2).show | |
// explodes | |
agg1.union(things2).show | |
// works fine | |
agg1.map(identity).union(things2).show |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment