Skip to content

Instantly share code, notes, and snippets.

@ahoy-jon
Created October 14, 2016 03:27
Show Gist options
  • Save ahoy-jon/36b1adcaa22fcf46a9ba056e1283ae05 to your computer and use it in GitHub Desktop.
Save ahoy-jon/36b1adcaa22fcf46a9ba056e1283ae05 to your computer and use it in GitHub Desktop.
val f1: DataFrame = parallelize.toDF()
f1.registerTempTable("st")
f1.show(false)
/*
+---+------+------+----------------------+------+------+
|id |value1|value2|updateAt |value3|value4|
+---+------+------+----------------------+------+------+
|id1|t1 |r1 |2016-10-14 05:23:50.51|10 |null |
|id1|t2 |null |2016-10-15 05:23:50.51|12 |null |
|id2|t2 |r1 |2016-10-15 05:23:50.51|0 |null |
+---+------+------+----------------------+------+------+
*/
//Get the max of T, and the corresponding value of V
//Ignoring null values
def maxVal[T:Ordering,V](a:(T,V),b:(T,V)):(T,V) = {
(implicitly[Ordering[T]].gt(a._1,b._1), a._2 == null, b._2 == null) match {
case (_ , true , false) => b
case (true, _ , _ ) => a
case (_ , false, true ) => a
case (_ ,_ , _ ) => b
}
}
val maxValUADF = semigroupToUADF(maxVal[Timestamp,String])
sqlContext.udf.register("maxVal", maxValUADF)
val sql2: DataFrame = sqlContext.sql(
"""select id,
maxVal(updateAt,value1)._2 as value1,
maxVal(updateAt,value2)._1 as updateAtValue2,
maxVal(updateAt,value2)._2 as value2,
max(updateAt) as updateAt,
max(value3) as value3,
max(value4) as value4
from st group by id""")
sql2.show(false)
/*
+---+------+----------------------+------+----------------------+------+------+
|id |value1|updateAtValue2 |value2|updateAt |value3|value4|
+---+------+----------------------+------+----------------------+------+------+
|id1|t2 |2016-10-14 05:23:50.51|r1 |2016-10-15 05:23:50.51|12 |null |
|id2|t2 |2016-10-15 05:23:50.51|r1 |2016-10-15 05:23:50.51|0 |null |
+---+------+----------------------+------+----------------------+------+------+
*/
println(sql2.rdd.toDebugString)
//ONLY 2 STAGES INSTEAD OF 4+ FOR THE EQUIVALENT QUERY IN STD-SQL
/*
(200) MapPartitionsRDD[24] at rdd at MoreSQL.scala:113 []
| MapPartitionsRDD[23] at rdd at MoreSQL.scala:113 []
| MapPartitionsRDD[22] at rdd at MoreSQL.scala:113 []
| MapPartitionsRDD[21] at rdd at MoreSQL.scala:113 []
| ShuffledRowRDD[20] at rdd at MoreSQL.scala:113 []
+-(8) MapPartitionsRDD[19] at rdd at MoreSQL.scala:113 []
| MapPartitionsRDD[18] at rdd at MoreSQL.scala:113 []
| MapPartitionsRDD[17] at rdd at MoreSQL.scala:113 []
| MapPartitionsRDD[16] at rdd at MoreSQL.scala:113 []
| MapPartitionsRDD[15] at rdd at MoreSQL.scala:113 []
| MapPartitionsRDD[14] at rdd at MoreSQL.scala:113 []
| MapPartitionsRDD[1] at rddToDataFrameHolder at MoreSQL.scala:60 []
| ParallelCollectionRDD[0] at parallelize at MoreSQL.scala:46 []
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment