Skip to content

Instantly share code, notes, and snippets.

@tmcgrath
Created December 2, 2014 14:41
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save tmcgrath/dd8a0f5fb19201deb65f to your computer and use it in GitHub Desktop.
Save tmcgrath/dd8a0f5fb19201deb65f to your computer and use it in GitHub Desktop.
Spark aggregateByKey example
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.1.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
2014-12-02 08:40:25.812 java[2479:1607] Unable to load realm mapping info from SCDynamicStore
14/12/02 08:40:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context available as sc.
scala> val babyNamesCSV = sc.parallelize(List(("David", 6), ("Abby", 4), ("David", 5), ("Abby", 5)))
babyNamesCSV: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12
scala> babyNamesCSV.reduceByKey((n,c) => n + c).collect
res0: Array[(String, Int)] = Array((Abby,9), (David,11))
scala> babyNamesCSV.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect
res1: Array[(String, Int)] = Array((Abby,9), (David,11))
@MikeC711
Copy link

Very nice way to keep it simple and show the contrast. I'm struggling (hope to extend this) to find the average age of the Abbys and the average age of the Davids. Any insights appreciated. Tx

@sujitpal
Copy link

@MikeC711 - hopefully you already know how to do this. If not, here is the code snippet. Came here looking for the same thing as you, found it in one of the lectures on Spark Fundamentals I from BigDataUniversity.

scala> babyNamesCSV.aggregateByKey((0, 0))(
        (acc, value) => (acc._1 + value, acc._2 + 1),
        (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
    .mapValues(sumCount => 1.0 * sumCount._1 / sumCount._2)
    .collect
res1: Array[(String, Double)] = Array((Abby,4.5), (David,5.5))

@Sureei
Copy link

Sureei commented Sep 10, 2015

Hi Sujitpal, Can you please guide me on doing avg on multiple columns in the same RDD? say example,
RDD1: Key1, Key2, Avg1, Avg2, Avg3
or
RDD2: Key1,Avg1,AVg2

@matthewadams
Copy link

NB: This is my $0.02. I've written a fair amount of coursework over the years & I'm just trying to help here.

You might consider using better naming to more clearly illustrate things to newbies. See my fork at https://gist.github.com/matthewadams/b107599a08719b166400; in particular, https://gist.github.com/matthewadams/b107599a08719b166400/revisions. Elaboration follows.

The reduceByKey example,

babyNamesCSV.reduceByKey((n,c) => n + c).collect

uses (n,c) => n + c. I'd suggest something similar to the following:

babyNamesCSV.reduceByKey((v1, v2) => v1 + v2).collect

Reasons:

  • It is not clear what n & c stand for, especially when a Spark newbie is trying to associate them with values (as distinct from keys).
  • In the function, they correspond to "first value" & "second value".
  • The symmetry of (v1, v2) more closely matches the symmetry of the required signature of the function.

The aggregateByKey example,

babyNamesCSV.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect

uses (k,v) => v.toInt+k & (v,k) => k+v. Here is how you might improve the aggregateByKey example for clarity:

babyNamesCSV.aggregateByKey(0)((accum, v) => accum + v, (v1, v2) => v1 + v2).collect

Reasons:

  • By using k & v, the Spark newbie might easily think they stand for "key" & "value", which these most certainly do not, in either function.
  • In the first function, they correspond to "accumulator" and "value".
  • In the second function,
    • they correspond to "first value" & "second value", and
    • v.toInt+k
      • uses an unnecessary toInt call, and
      • inverts the order of the use of the variables relative to their declaration.

Lastly, the name babyNamesCSV is a bit verbose, and there is nothing in the example that refers to or uses the comma-separated value format. I might suggest nameCounts or similar.

@rajeshwarn433
Copy link

@matthewadams thanks for the clear explanation. yes I am new to spark and i exactly mis interpreted and got confused by the above code.

@Dakshal07
Copy link

val products=sc.textFile("/user/cloudera/products")
val productmap=products.map(x=>x.split(",")).map(x=>(x(1).toInt,x(4).toFloat))
productmap.take(5).foreach(println)
(2,59.98)
(2,129.99)
(2,89.99)
(2,89.99)
(2,199.99)
val countandtotal=productmap.aggregateByKey((0,0.0))((x,y)=>(x._1+1,x._2+y),(x,y)=>(x._1+y._1,x._2+y._2))
org.apache.spark.rdd.RDD[(Int, (Int, Double))] = ShuffledRDD[38] at aggregateByKey at :31

countandtotal.take(2).foreach(println)

I want to count number of products under each category id and price under category...When I want to print countandtotal.take(2).foreach(println) then its shows number format exception .Even I changed intial value 0.0 to 0.0f..please help

@mmeasic
Copy link

mmeasic commented Jun 5, 2018

@matthewadams Good explanation, I thought the same thing.

@michaelmior
Copy link

Note that you could replace

babyNamesCSV.reduceByKey((n,c) => n + c).collect

with

babyNamesCSV.reduceByKey(_ + _).collect

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment