Skip to content

Instantly share code, notes, and snippets.

Created December 2, 2014 14:41
Show Gist options
  • Save tmcgrath/dd8a0f5fb19201deb65f to your computer and use it in GitHub Desktop.
Save tmcgrath/dd8a0f5fb19201deb65f to your computer and use it in GitHub Desktop.
Spark aggregateByKey example
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.1.0
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
2014-12-02 08:40:25.812 java[2479:1607] Unable to load realm mapping info from SCDynamicStore
14/12/02 08:40:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context available as sc.
scala> val babyNamesCSV = sc.parallelize(List(("David", 6), ("Abby", 4), ("David", 5), ("Abby", 5)))
babyNamesCSV: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12
scala> babyNamesCSV.reduceByKey((n,c) => n + c).collect
res0: Array[(String, Int)] = Array((Abby,9), (David,11))
scala> babyNamesCSV.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect
res1: Array[(String, Int)] = Array((Abby,9), (David,11))
Copy link

@MikeC711 - hopefully you already know how to do this. If not, here is the code snippet. Came here looking for the same thing as you, found it in one of the lectures on Spark Fundamentals I from BigDataUniversity.

scala> babyNamesCSV.aggregateByKey((0, 0))(
        (acc, value) => (acc._1 + value, acc._2 + 1),
        (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
    .mapValues(sumCount => 1.0 * sumCount._1 / sumCount._2)
res1: Array[(String, Double)] = Array((Abby,4.5), (David,5.5))

Copy link

Sureei commented Sep 10, 2015

Hi Sujitpal, Can you please guide me on doing avg on multiple columns in the same RDD? say example,
RDD1: Key1, Key2, Avg1, Avg2, Avg3
RDD2: Key1,Avg1,AVg2

Copy link

NB: This is my $0.02. I've written a fair amount of coursework over the years & I'm just trying to help here.

You might consider using better naming to more clearly illustrate things to newbies. See my fork at; in particular, Elaboration follows.

The reduceByKey example,

babyNamesCSV.reduceByKey((n,c) => n + c).collect

uses (n,c) => n + c. I'd suggest something similar to the following:

babyNamesCSV.reduceByKey((v1, v2) => v1 + v2).collect


  • It is not clear what n & c stand for, especially when a Spark newbie is trying to associate them with values (as distinct from keys).
  • In the function, they correspond to "first value" & "second value".
  • The symmetry of (v1, v2) more closely matches the symmetry of the required signature of the function.

The aggregateByKey example,

babyNamesCSV.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect

uses (k,v) => v.toInt+k & (v,k) => k+v. Here is how you might improve the aggregateByKey example for clarity:

babyNamesCSV.aggregateByKey(0)((accum, v) => accum + v, (v1, v2) => v1 + v2).collect


  • By using k & v, the Spark newbie might easily think they stand for "key" & "value", which these most certainly do not, in either function.
  • In the first function, they correspond to "accumulator" and "value".
  • In the second function,
    • they correspond to "first value" & "second value", and
    • v.toInt+k
      • uses an unnecessary toInt call, and
      • inverts the order of the use of the variables relative to their declaration.

Lastly, the name babyNamesCSV is a bit verbose, and there is nothing in the example that refers to or uses the comma-separated value format. I might suggest nameCounts or similar.

Copy link

@matthewadams thanks for the clear explanation. yes I am new to spark and i exactly mis interpreted and got confused by the above code.

Copy link

val products=sc.textFile("/user/cloudera/products")
val countandtotal=productmap.aggregateByKey((0,0.0))((x,y)=>(x._1+1,x._2+y),(x,y)=>(x._1+y._1,x._2+y._2))
org.apache.spark.rdd.RDD[(Int, (Int, Double))] = ShuffledRDD[38] at aggregateByKey at :31


I want to count number of products under each category id and price under category...When I want to print countandtotal.take(2).foreach(println) then its shows number format exception .Even I changed intial value 0.0 to 0.0f..please help

Copy link

mmeasic commented Jun 5, 2018

@matthewadams Good explanation, I thought the same thing.

Copy link

Note that you could replace

babyNamesCSV.reduceByKey((n,c) => n + c).collect


babyNamesCSV.reduceByKey(_ + _).collect

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment