Skip to content

Instantly share code, notes, and snippets.

@ashkrit
Created November 18, 2018 16:59
private def mergeValues(value1: GroupByValue, value2: GroupByValue): GroupByValue = {
if (value2.days.size > value1.days.size) {
value2.count = value1.count + value2.count
value1.days.foreach(d => value2.days.add(d))
value2
}
else {
value1.count = value1.count + value2.count
value2.days.foreach(d => value1.days.add(d))
value1
}
}
private def saveData(aggValue: RDD[(String, GroupByValue)], now: String) = {
aggValue.mapPartitions(rows => {
val buffer = new StringBuffer()
rows.map {
case (key, value) =>
buffer.setLength(0)
buffer
.append(key).append("\t")
.append(value.count).append("\t")
.append(value.days.mkString(","))
buffer.toString
}
})
.coalesce(100)
.saveAsTextFile(s"/data/output/${now}", classOf[GzipCodec])
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment