Instantly share code, notes, and snippets.

Embed
What would you like to do?
package org.apache.spark.test.app
import org.apache.spark.mllib.random.RandomRDDs
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
trait TestGrouping {
def groupKeyValueRDD(keyValueRDD: RDD[(Int, Long)]): RDD[(Int, Iterable[Long])]
def outputTable: String
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val sc = new SparkContext(conf)
val hc = new HiveContext(sc)
import hc.implicits._
val longRDD: RDD[Long] =
RandomRDDs.normalRDD(sc, 100*1000*1000, 1000)
.map(_ * 1000 * 1000)
.map(_.toLong)
val keyValuePairRDD: RDD[(Int, Long)] =
longRDD
.keyBy(n => (n / 100000).toInt)
val groupedRDD: RDD[(Int, Iterable[Long])] =
groupKeyValueRDD(keyValuePairRDD)
val groupStatRDD: RDD[(Int, (Int, Double, Double))] =
groupedRDD.mapValues(numbers => {
var (sum, count) = (0L, 0)
for (num <- numbers) {
sum += num
count += 1
}
val mean = (sum * 1.0) / count
var sqDevSum = 0.0
for (num <- numbers) {
sqDevSum += Math.pow(num - mean, 2)
}
val stdDev = Math.sqrt(sqDevSum / count)
(count, mean, stdDev)
})
groupStatRDD
.map({case (key, (count, mean, stdDev)) => (key, count, mean, stdDev)})
.toDF("key", "count", "mean", "std_dev")
.write.mode(SaveMode.Overwrite)
.saveAsTable(outputTable)
}
}
object TestReduceByKeyWithArrayBuffer extends TestGrouping {
override def groupKeyValueRDD(keyValueRDD: RDD[(Int, Long)]): RDD[(Int, Iterable[Long])] = {
keyValueRDD
.mapValues(ArrayBuffer(_))
.reduceByKey(_ ++= _, numPartitions = 200)
.asInstanceOf[RDD[(Int, Iterable[Long])]]
}
override def outputTable: String = "reducebykey_arraybuffer"
}
object TestReduceByKeyWithListBuffer extends TestGrouping {
override def groupKeyValueRDD(keyValueRDD: RDD[(Int, Long)]): RDD[(Int, Iterable[Long])] = {
keyValueRDD
.mapValues(ListBuffer(_))
.reduceByKey(_ ++= _, numPartitions = 200)
.asInstanceOf[RDD[(Int, Iterable[Long])]]
}
override def outputTable: String = "reducebykey_listbuffer"
}
object TestReduceByKeyWithUnrolledBuffer extends TestGrouping {
override def groupKeyValueRDD(keyValueRDD: RDD[(Int, Long)]): RDD[(Int, Iterable[Long])] = {
keyValueRDD
.mapValues(mutable.UnrolledBuffer(_))
.reduceByKey(_ concat _, numPartitions = 200)
.asInstanceOf[RDD[(Int, Iterable[Long])]]
}
override def outputTable: String = "reducebykey_unrolledbuffer"
}
object TestReduceByKeyWithCompactBuffer extends TestGrouping {
override def groupKeyValueRDD(keyValueRDD: RDD[(Int, Long)]): RDD[(Int, Iterable[Long])] = {
keyValueRDD
.mapValues(CompactBuffer(_))
.reduceByKey(_ ++= _, numPartitions = 200)
.asInstanceOf[RDD[(Int, Iterable[Long])]]
}
override def outputTable: String = "reducebykey_compactbuffer"
}
object TestGroupingWithGroupByKey extends TestGrouping {
override def groupKeyValueRDD(keyValueRDD: RDD[(Int, Long)]): RDD[(Int, Iterable[Long])] = {
keyValueRDD
.groupByKey(numPartitions = 200)
}
override def outputTable: String = "groupbykey"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment