Sample code for the Spark PairRDDFunctions - AggregateByKey
package bbejeck.grouping
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
* Created by bbejeck on 7/31/15.
* Example of using AggregateByKey
object AggregateByKey {
def runAggregateByKeyExample() = {
val sc = new SparkContext(new SparkConf().setAppName("Grouping Examples"))
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv ="=")).map(v => (v(0), v(1))).cache()
val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
println("Aggregate By Key unique Results")
val uniqueResults = uniqueByKey.collect()
for(indx <- uniqueResults.indices){
val r = uniqueResults(indx)
println(r._1 + " -> " + r._2.mkString(","))
println("Aggregate By Key sum Results")
val sumResults = countByKey.collect()
for(indx <- sumResults.indices){
val r = sumResults(indx)
println(r._1 + " -> " + r._2)

@hkxIron hkxIron commented Jun 27, 2018

Hi, @bbejeck ,is there a method to caculate uniqueByKey and countByKey at the same time, which means that we will visit the dataset only once, I have tried to modify your code to implement it but failed, do you have any good idea? thanks!

