Last active
September 18, 2015 21:37
-
-
Save MansurAshraf/fbb200f23268f7628b90 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
colums: A B C D E | |
types: String String String Int Int | |
/** | |
Group By ABC, and within each group find the max of (E-D) | |
then fanout from E to D, assign 1 to each value from E to D and put them in a map. | |
GroupBy AB, merge all the maps | |
*/ | |
rdd | |
.groupBy((A,B,C,_,_) => (A,B,C)) | |
.map{ | |
// whats up with this obnoxious type? In Scalding I can just | |
//map over the values of GroupBy, why is the key being passed?? | |
//second why does the value Iterator also contains the keys? | |
//Keys will be the same in every single tuple? WTF | |
((A,B,C),Iterrator(A,B,C,D,E,F)) => | |
val maxDeltaBetweenEandD =Iterrator.reduce((l,r)=> if ((l._6 - l._5) >(r._6 - r._5)) l else r) | |
val(_,_,_,d,e) => maxDeltaBetweenEandD | |
val histogram = (d to e).map(_ -> 1).toMap | |
(A,B,C,histogram) | |
} | |
.groupBy((A,B,_,_) => (A,B)) | |
.map{ | |
case (obnoxiousKey,Iter[A,B,C,Hist]) => | |
val sum = MapMonoid.sum(Iter.map(_.4)) | |
(A,B,sum) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment