Skip to content

Instantly share code, notes, and snippets.

@cipri7329
Created December 20, 2016 08:10
Show Gist options
  • Save cipri7329/115bbe160fd52b61f0800c283600b2a4 to your computer and use it in GitHub Desktop.
Save cipri7329/115bbe160fd52b61f0800c283600b2a4 to your computer and use it in GitHub Desktop.
spark aggregation example
//example from https://courses.bigdatauniversity.com/courses/course-v1:BigDataUniversity+BD0212EN+2016/ exercises
val input1 = sc.textFile("data/trips/*")
val header1 = input1.first // to skip the header row
val trips = input1.
filter(_ != header1).
map(_.split(",")).
map(utils.Trip.parse(_))
val input2 = sc.textFile("data/stations/*")
val header2 = input2.first // to skip the header row
val stations = input2.
filter(_ != header2).
map(_.split(",")).
map(utils.Station.parse(_))
val durationsByStart = trips.keyBy(_.startTerminal).mapValues(_.duration)
val durationsByEnd = trips.keyBy(_.endTerminal).mapValues(_.duration)
val resultsStart = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 + value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avgStart = resultsStart.mapValues(i => i._1 / i._2)
val resultsEnd = durationsByEnd.aggregateByKey((0, 0))((acc, value) => (acc._1 + value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avgEnd = resultsEnd.mapValues(i => i._1 / i._2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment