Created
December 20, 2016 08:10
-
-
Save cipri7329/115bbe160fd52b61f0800c283600b2a4 to your computer and use it in GitHub Desktop.
spark aggregation example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//example from https://courses.bigdatauniversity.com/courses/course-v1:BigDataUniversity+BD0212EN+2016/ exercises | |
val input1 = sc.textFile("data/trips/*") | |
val header1 = input1.first // to skip the header row | |
val trips = input1. | |
filter(_ != header1). | |
map(_.split(",")). | |
map(utils.Trip.parse(_)) | |
val input2 = sc.textFile("data/stations/*") | |
val header2 = input2.first // to skip the header row | |
val stations = input2. | |
filter(_ != header2). | |
map(_.split(",")). | |
map(utils.Station.parse(_)) | |
val durationsByStart = trips.keyBy(_.startTerminal).mapValues(_.duration) | |
val durationsByEnd = trips.keyBy(_.endTerminal).mapValues(_.duration) | |
val resultsStart = durationsByStart.aggregateByKey((0, 0))((acc, value) => (acc._1 + value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) | |
val avgStart = resultsStart.mapValues(i => i._1 / i._2) | |
val resultsEnd = durationsByEnd.aggregateByKey((0, 0))((acc, value) => (acc._1 + value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) | |
val avgEnd = resultsEnd.mapValues(i => i._1 / i._2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment