Skip to content

Instantly share code, notes, and snippets.

@invkrh
Created July 22, 2015 22:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save invkrh/9826097f48533c519b23 to your computer and use it in GitHub Desktop.
Save invkrh/9826097f48533c519b23 to your computer and use it in GitHub Desktop.
Connected Component Problem
package me.invkrh.train
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
/**
* Created with IntelliJ IDEA.
* User: invkrh
* Date: 22/07/15
* Time: 23:14
*/
object ConnectedComponent extends App {
val conf = new SparkConf()
.setAppName(this.getClass.getName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
val arcs = sc.makeRDD(Seq((1, 2), (2, 3), (2, 4), (5, 6)))
val input = arcs.flatMap {
case pair => Array(pair, pair.swap)
}.map {
case (left, right) => (Set(left), Set(right))
}
val output = connectedComponent(input.persist()).keys.collect foreach println
def connectedComponent(dataSet: RDD[(Set[Int], Set[Int])]): RDD[(Set[Int], Set[Int])] = {
val keys = dataSet.keys
val cnt = keys.count()
val distCnt = keys.distinct().count()
if (cnt != distCnt) {
val next = dataSet.reduceByKey(_ union _)
.flatMap { case (cc, neighbours) =>
if (neighbours.nonEmpty) {
neighbours.map(n => (cc + n, neighbours - n))
} else {
Array((cc, Set[Int]()))
}
}.persist()
next.count()
dataSet.unpersist(false)
connectedComponent(next)
} else {
dataSet
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment