Skip to content

Instantly share code, notes, and snippets.

@tmcgrath
Created December 2, 2014 14:41
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save tmcgrath/dd8a0f5fb19201deb65f to your computer and use it in GitHub Desktop.
Save tmcgrath/dd8a0f5fb19201deb65f to your computer and use it in GitHub Desktop.
Spark aggregateByKey example
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.1.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
2014-12-02 08:40:25.812 java[2479:1607] Unable to load realm mapping info from SCDynamicStore
14/12/02 08:40:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context available as sc.
scala> val babyNamesCSV = sc.parallelize(List(("David", 6), ("Abby", 4), ("David", 5), ("Abby", 5)))
babyNamesCSV: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12
scala> babyNamesCSV.reduceByKey((n,c) => n + c).collect
res0: Array[(String, Int)] = Array((Abby,9), (David,11))
scala> babyNamesCSV.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect
res1: Array[(String, Int)] = Array((Abby,9), (David,11))
@Sureei
Copy link

Sureei commented Sep 10, 2015

Hi Sujitpal, Can you please guide me on doing avg on multiple columns in the same RDD? say example,
RDD1: Key1, Key2, Avg1, Avg2, Avg3
or
RDD2: Key1,Avg1,AVg2

@matthewadams
Copy link

NB: This is my $0.02. I've written a fair amount of coursework over the years & I'm just trying to help here.

You might consider using better naming to more clearly illustrate things to newbies. See my fork at https://gist.github.com/matthewadams/b107599a08719b166400; in particular, https://gist.github.com/matthewadams/b107599a08719b166400/revisions. Elaboration follows.

The reduceByKey example,

babyNamesCSV.reduceByKey((n,c) => n + c).collect

uses (n,c) => n + c. I'd suggest something similar to the following:

babyNamesCSV.reduceByKey((v1, v2) => v1 + v2).collect

Reasons:

  • It is not clear what n & c stand for, especially when a Spark newbie is trying to associate them with values (as distinct from keys).
  • In the function, they correspond to "first value" & "second value".
  • The symmetry of (v1, v2) more closely matches the symmetry of the required signature of the function.

The aggregateByKey example,

babyNamesCSV.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect

uses (k,v) => v.toInt+k & (v,k) => k+v. Here is how you might improve the aggregateByKey example for clarity:

babyNamesCSV.aggregateByKey(0)((accum, v) => accum + v, (v1, v2) => v1 + v2).collect

Reasons:

  • By using k & v, the Spark newbie might easily think they stand for "key" & "value", which these most certainly do not, in either function.
  • In the first function, they correspond to "accumulator" and "value".
  • In the second function,
    • they correspond to "first value" & "second value", and
    • v.toInt+k
      • uses an unnecessary toInt call, and
      • inverts the order of the use of the variables relative to their declaration.

Lastly, the name babyNamesCSV is a bit verbose, and there is nothing in the example that refers to or uses the comma-separated value format. I might suggest nameCounts or similar.

@rajeshwarn433
Copy link

@matthewadams thanks for the clear explanation. yes I am new to spark and i exactly mis interpreted and got confused by the above code.

@Dakshal07
Copy link

val products=sc.textFile("/user/cloudera/products")
val productmap=products.map(x=>x.split(",")).map(x=>(x(1).toInt,x(4).toFloat))
productmap.take(5).foreach(println)
(2,59.98)
(2,129.99)
(2,89.99)
(2,89.99)
(2,199.99)
val countandtotal=productmap.aggregateByKey((0,0.0))((x,y)=>(x._1+1,x._2+y),(x,y)=>(x._1+y._1,x._2+y._2))
org.apache.spark.rdd.RDD[(Int, (Int, Double))] = ShuffledRDD[38] at aggregateByKey at :31

countandtotal.take(2).foreach(println)

I want to count number of products under each category id and price under category...When I want to print countandtotal.take(2).foreach(println) then its shows number format exception .Even I changed intial value 0.0 to 0.0f..please help

@mmeasic
Copy link

mmeasic commented Jun 5, 2018

@matthewadams Good explanation, I thought the same thing.

@michaelmior
Copy link

Note that you could replace

babyNamesCSV.reduceByKey((n,c) => n + c).collect

with

babyNamesCSV.reduceByKey(_ + _).collect

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment