Instantly share code, notes, and snippets.

Embed
What would you like to do?
Spark aggregateByKey example
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.1.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
2014-12-02 08:40:25.812 java[2479:1607] Unable to load realm mapping info from SCDynamicStore
14/12/02 08:40:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context available as sc.
scala> val babyNamesCSV = sc.parallelize(List(("David", 6), ("Abby", 4), ("David", 5), ("Abby", 5)))
babyNamesCSV: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12
scala> babyNamesCSV.reduceByKey((n,c) => n + c).collect
res0: Array[(String, Int)] = Array((Abby,9), (David,11))
scala> babyNamesCSV.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect
res1: Array[(String, Int)] = Array((Abby,9), (David,11))
@MikeC711

This comment has been minimized.

Copy link

MikeC711 commented Jul 23, 2015

Very nice way to keep it simple and show the contrast. I'm struggling (hope to extend this) to find the average age of the Abbys and the average age of the Davids. Any insights appreciated. Tx

@sujitpal

This comment has been minimized.

Copy link

sujitpal commented Aug 18, 2015

@MikeC711 - hopefully you already know how to do this. If not, here is the code snippet. Came here looking for the same thing as you, found it in one of the lectures on Spark Fundamentals I from BigDataUniversity.

scala> babyNamesCSV.aggregateByKey((0, 0))(
        (acc, value) => (acc._1 + value, acc._2 + 1),
        (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
    .mapValues(sumCount => 1.0 * sumCount._1 / sumCount._2)
    .collect
res1: Array[(String, Double)] = Array((Abby,4.5), (David,5.5))
@Sureei

This comment has been minimized.

Copy link

Sureei commented Sep 10, 2015

Hi Sujitpal, Can you please guide me on doing avg on multiple columns in the same RDD? say example,
RDD1: Key1, Key2, Avg1, Avg2, Avg3
or
RDD2: Key1,Avg1,AVg2

@matthewadams

This comment has been minimized.

Copy link

matthewadams commented Nov 6, 2015

NB: This is my $0.02. I've written a fair amount of coursework over the years & I'm just trying to help here.

You might consider using better naming to more clearly illustrate things to newbies. See my fork at https://gist.github.com/matthewadams/b107599a08719b166400; in particular, https://gist.github.com/matthewadams/b107599a08719b166400/revisions. Elaboration follows.

The reduceByKey example,

babyNamesCSV.reduceByKey((n,c) => n + c).collect

uses (n,c) => n + c. I'd suggest something similar to the following:

babyNamesCSV.reduceByKey((v1, v2) => v1 + v2).collect

Reasons:

  • It is not clear what n & c stand for, especially when a Spark newbie is trying to associate them with values (as distinct from keys).
  • In the function, they correspond to "first value" & "second value".
  • The symmetry of (v1, v2) more closely matches the symmetry of the required signature of the function.

The aggregateByKey example,

babyNamesCSV.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect

uses (k,v) => v.toInt+k & (v,k) => k+v. Here is how you might improve the aggregateByKey example for clarity:

babyNamesCSV.aggregateByKey(0)((accum, v) => accum + v, (v1, v2) => v1 + v2).collect

Reasons:

  • By using k & v, the Spark newbie might easily think they stand for "key" & "value", which these most certainly do not, in either function.
  • In the first function, they correspond to "accumulator" and "value".
  • In the second function,
    • they correspond to "first value" & "second value", and
    • v.toInt+k
      • uses an unnecessary toInt call, and
      • inverts the order of the use of the variables relative to their declaration.

Lastly, the name babyNamesCSV is a bit verbose, and there is nothing in the example that refers to or uses the comma-separated value format. I might suggest nameCounts or similar.

@rajeshwarn433

This comment has been minimized.

Copy link

rajeshwarn433 commented Mar 4, 2018

@matthewadams thanks for the clear explanation. yes I am new to spark and i exactly mis interpreted and got confused by the above code.

@Dakshal07

This comment has been minimized.

Copy link

Dakshal07 commented May 20, 2018

val products=sc.textFile("/user/cloudera/products")
val productmap=products.map(x=>x.split(",")).map(x=>(x(1).toInt,x(4).toFloat))
productmap.take(5).foreach(println)
(2,59.98)
(2,129.99)
(2,89.99)
(2,89.99)
(2,199.99)
val countandtotal=productmap.aggregateByKey((0,0.0))((x,y)=>(x._1+1,x._2+y),(x,y)=>(x._1+y._1,x._2+y._2))
org.apache.spark.rdd.RDD[(Int, (Int, Double))] = ShuffledRDD[38] at aggregateByKey at :31

countandtotal.take(2).foreach(println)

I want to count number of products under each category id and price under category...When I want to print countandtotal.take(2).foreach(println) then its shows number format exception .Even I changed intial value 0.0 to 0.0f..please help

@mmeasic

This comment has been minimized.

Copy link

mmeasic commented Jun 5, 2018

@matthewadams Good explanation, I thought the same thing.

@michaelmior

This comment has been minimized.

Copy link

michaelmior commented Jun 5, 2018

Note that you could replace

babyNamesCSV.reduceByKey((n,c) => n + c).collect

with

babyNamesCSV.reduceByKey(_ + _).collect
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment