Skip to content

Instantly share code, notes, and snippets.

@MansurAshraf
Last active September 18, 2015 21:37
Show Gist options
  • Save MansurAshraf/fbb200f23268f7628b90 to your computer and use it in GitHub Desktop.
Save MansurAshraf/fbb200f23268f7628b90 to your computer and use it in GitHub Desktop.
colums: A B C D E
types: String String String Int Int
/**
Group By ABC, and within each group find the max of (E-D)
then fanout from E to D, assign 1 to each value from E to D and put them in a map.
GroupBy AB, merge all the maps
*/
rdd
.groupBy((A,B,C,_,_) => (A,B,C))
.map{
// whats up with this obnoxious type? In Scalding I can just
//map over the values of GroupBy, why is the key being passed??
//second why does the value Iterator also contains the keys?
//Keys will be the same in every single tuple? WTF
((A,B,C),Iterrator(A,B,C,D,E,F)) =>
val maxDeltaBetweenEandD =Iterrator.reduce((l,r)=> if ((l._6 - l._5) >(r._6 - r._5)) l else r)
val(_,_,_,d,e) => maxDeltaBetweenEandD
val histogram = (d to e).map(_ -> 1).toMap
(A,B,C,histogram)
}
.groupBy((A,B,_,_) => (A,B))
.map{
case (obnoxiousKey,Iter[A,B,C,Hist]) =>
val sum = MapMonoid.sum(Iter.map(_.4))
(A,B,sum)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment