Skip to content

Instantly share code, notes, and snippets.

@pierre-borckmans
Created January 26, 2015 11:11
Show Gist options
  • Save pierre-borckmans/4853cd6d0b2f2388bf4f to your computer and use it in GitHub Desktop.
Save pierre-borckmans/4853cd6d0b2f2388bf4f to your computer and use it in GitHub Desktop.
Spark SQL 1.2.0 - Self join on ArrayType fields problems
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
val sqlContext = new HiveContext( sc )
import sqlContext._
// Let's build a table with two columns with types String and Array[String]
val schema = StructType(
Array(
StructField("name", StringType, false),
StructField("friends", ArrayType(StringType, true), false)
)
)
// A few rows
val friends = Array(
Row("Adam", List("Larry")),
Row("Thoralf", List("Larry")),
Row("Larry", List("Adam")),
Row("Thoralf", List("Pierre")),
Row("Karim", List("Pierre"))
)
val friendsRDD = sc.parallelize(friends)
// In the first case, the Array type of the field "friends" is predefined in the schema
val friendsSRDD1 = applySchema(friendsRDD, schema)
friendsSRDD1.registerTempTable("friends1")
// In the second case, the Array type of the field "friends" is obtained with a query
val friendsSRDD2 = sql("SELECT name, array(friends[0]) as friends FROM friends1")
friendsSRDD2.registerTempTable("friends2")
// verify that the two tables "friends1" and "friends2" have the exact same content and their schemas are identical
assert( friendsSRDD1.schema == friendsSRDD2.schema )
assert( friendsSRDD1.collect.deep == friendsSRDD2.collect.deep )
assert( sql("select * from friends1").collect.deep == sql("select * from friends2").collect.deep )
// Issue 1: Performing a self full-join on the first element of the column friends is working with the first table, but not with the second
println("\n========================================================")
println("Expected full-join result:")
val fullJoinFriends1 = sql("SELECT * FROM friends1 as alias1 FULL JOIN friends1 as alias2 ON (alias1.friends[0] = alias2.name)")
fullJoinFriends1.collect.foreach(println)
println("\nIncorrect full-join result:")
val fullJoinFriends2 = sql("SELECT * FROM friends2 as alias1 FULL JOIN friends2 as alias2 ON (alias1.friends[0] = alias2.name)")
fullJoinFriends2.collect.foreach(println)
println("========================================================")
// Issue 2: Performing a self inner-join on the first element of the column friends is working with the first table, but not with the second
println("\n========================================================")
println("Expected inner join result:")
val innerJoinFriends1 = sql("SELECT * FROM friends1 as alias1 JOIN friends1 as alias2 ON (alias1.friends[0] = alias2.name)")
innerJoinFriends1.collect.foreach(println)
println("\nIncorrect inner join result:")
val innerJoinFriends2 = sql("SELECT * FROM friends2 as alias1 JOIN friends2 as alias2 ON (alias1.friends[0] = alias2.name)")
innerJoinFriends2.collect.foreach(println)
println("========================================================")
// Issue 3: Performing a self left-join on the first element of the column friends is working with the first table, but not with the second
println("\n========================================================")
println("Expected left join result:")
val leftJoinFriends1 = sql("SELECT * FROM friends1 as alias1 LEFT JOIN friends1 as alias2 ON (alias1.friends[0] = alias2.name)")
leftJoinFriends1.collect.foreach(println)
println("\nIncorrect left join result:")
val leftJoinFriends2 = sql("SELECT * FROM friends2 as alias1 LEFT JOIN friends2 as alias2 ON (alias1.friends[0] = alias2.name)")
leftJoinFriends2.collect.foreach(println)
println("========================================================")
// Issue 4: Performing a self right-join on the first element of the column friends is working with the first table, but not with the second
println("\n========================================================")
println("Expected right join result:")
val rightJoinFriends1 = sql("SELECT * FROM friends1 as alias1 RIGHT JOIN friends1 as alias2 ON (alias1.friends[0] = alias2.name)")
rightJoinFriends1.collect.foreach(println)
println("\nIncorrect right join result:")
val rightJoinFriends2 = sql("SELECT * FROM friends2 as alias1 RIGHT JOIN friends2 as alias2 ON (alias1.friends[0] = alias2.name)")
rightJoinFriends2.collect.foreach(println)
println("========================================================")
println("\n\n\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment