Skip to content

Instantly share code, notes, and snippets.

@whiter4bbit
Created March 10, 2012 13:31
Show Gist options
  • Save whiter4bbit/2011424 to your computer and use it in GitHub Desktop.
Save whiter4bbit/2011424 to your computer and use it in GitHub Desktop.
Dijkstra using scalding
import com.twitter.scalding._
class DijkstraJob(args: Args) extends Job(args) {
val iteration = args.getOrElse("iteration", "0").toInt
Tsv(args("input"), ('node, 'dist, 'adjacent))
.read
.flatMap(('node, 'dist, 'adjacent) -> ('node, 'dist, 'adjacent)) { p: (String, Int, String) =>
val (node, distance, adjacent) = p
(node, distance, adjacent) +: adjacent.split(":").map { part: String =>
(part, distance + 1, "")
}
}.project(('node, 'dist, 'adjacent))
.groupBy('node) {
_.foldLeft(('dist, 'adjacent) -> ('min, 'adjacent))((Int.MaxValue, "")) { (min: (Int, String), current: (Int, String)) =>
if (min._1 < current._1)
(min._1, min._2 + current._2)
else
(current._1, min._2 + current._2)
}
}.write(Tsv(args("output")))
.then { pipe : RichPipe =>
if (iteration > 0) {
Tsv(args("temp"), ('tnode, 'tdist, 'tadjacent))
.read
.joinWithSmaller('tnode -> 'node, pipe)
.groupAll {
_.count(('min, 'tdist) -> ('count)) { p: (Int, Int) =>
val (min, dist) = p
min - dist != 0
}
}
.write(Tsv(args("diff")))
}
pipe
}
override def next: Option[Job] = {
val nextArgs = args + ("input", Some(args("output"))) +
("temp", Some(args("output"))) +
("output", Some(args("temp"))) +
("iteration", Some((iteration + 1).toString))
if (iteration > 0) {
val diff = Tsv(args("diff")).readAtSubmitter[Int].head
if (diff > 0) {
Some(clone(nextArgs))
} else {
None
}
} else {
Some(clone(nextArgs))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment