Skip to content

Instantly share code, notes, and snippets.

@ashwanthkumar
Created June 12, 2014 16:53
Show Gist options
  • Save ashwanthkumar/0b626746ddf119968556 to your computer and use it in GitHub Desktop.
Save ashwanthkumar/0b626746ddf119968556 to your computer and use it in GitHub Desktop.
User Identifier Normalization from Big Data book by Nathan Marz implementated in Scalding.
import com.twitter.scalding.{Tsv, Job, Args}
import scala.collection.immutable.TreeSet
/*
Lets assume we are reading a file of format (a,b) where a,b denote that node a and node b are connected in a graph.
For simplicity we will assume that a and b are ints. We want to find the mapping of all the nodes on a fixed point.
*/
class FixedPointJob(args: Args) extends Job(args) {
val input = args("input")
val outputBaseDir = args("output-base-dir")
val progressSink = args("progress")
val iteration = args.getOrElse("iteration", "1").toInt
val sink = outputBaseDir + "/run-" + iteration
def bidirectionalEdge(tuple: (Int, Int)) = {
val (node1, node2) = tuple
Iterable((node1, node2), (node2, node1))
}
def iterateEdges(edges: Iterator[(Int, Int)]) = {
val (grouped, first) = edges.next()
val allIds = edges.foldLeft(TreeSet(grouped, first))((soFar, elem) => soFar + elem._2)
val smallest = allIds.head
val progress = allIds.size > 2 && !grouped.equals(smallest)
(allIds - smallest).map(elem => (smallest, elem, progress))
}
val iterationSoFar = {
Tsv(input, fields = ('n1, 'n2))
.flatMapTo(('n1, 'n2) -> ('b1, 'b2))(bidirectionalEdge)
.groupBy('b2)(_.mapStream(('b2, 'b1) -> ('node1, 'node2, 'isNew))(iterateEdges))
.project(('node1, 'node2, 'isNew))
}
iterationSoFar
.filter('isNew)(identity[Boolean])
.write(Tsv(progressSink))
iterationSoFar
.distinct(('node1, 'node2))
.write(Tsv(sink))
override def next: Option[Job] = {
val nextIteration = iteration + 1
val nextArgs = args + ("input", Some(sink)) +
("output", Some(outputBaseDir + "/run-" + nextIteration)) +
("iteration", Some(nextIteration.toString))
if(!Tsv(progressSink).readAtSubmitter[(Int, Int)].isEmpty) {
Some(clone(nextArgs))
} else {
None
}
}
}
import org.scalatest.FunSuite
import com.twitter.scalding.{FieldConversions, TupleConversions, Tsv, JobTest}
import scala.collection.mutable
import org.scalatest.matchers.ShouldMatchers
class FixedPointJobTest extends FunSuite with TupleConversions with FieldConversions with ShouldMatchers {
test("should work for the example in the book") {
val exampleGraphInput = List(
(1,4), (4,3), (4,5), (5,2), (5,11)
)
val graphAfterIteration1 = List(
(1,4), (1,3), (1,5), (3,4), (2,4), (2,5), (2, 11), (5, 11)
)
val graphAfterIteration2 = List(
(1,3), (1,4), (1,5), (1,11), (1,2), (2,5), (2, 4), (2,11)
)
val finalExpectedGraph = List(
(1,2), (1,3), (1,4), (1,5), (1,11)
)
def validateOutput(runString: String, expectedGraph: List[(Int, Int)])(buffer: mutable.Buffer[(Int, Int)]) {
println(runString)
buffer.filterNot(expectedGraph.contains).size should be(0)
}
JobTest(new FixedPointJob(_))
.arg("input", "input")
.arg("output-base-dir", "output-base-dir")
.arg("progress", "progress")
.arg("iteration", "1")
.source(Tsv("input", fields = ('n1, 'n2)), exampleGraphInput)
.source(Tsv("output-base-dir/run-1", fields = ('n1, 'n2)), graphAfterIteration1)
.source(Tsv("output-base-dir/run-2", fields = ('n1, 'n2)), graphAfterIteration2)
.source(Tsv("output-base-dir/run-3", fields = ('n1, 'n2)), finalExpectedGraph)
.source(Tsv("output-base-dir/run-4", fields = ('n1, 'n2)), Iterable())
.sink(Tsv("output-base-dir/run-1"))(validateOutput("Validating Run 1", graphAfterIteration1))
.sink(Tsv("output-base-dir/run-2"))(validateOutput("Validating Run 2", graphAfterIteration2))
.sink(Tsv("output-base-dir/run-3"))(validateOutput("Validating Run 3", finalExpectedGraph))
.sink(Tsv("output-base-dir/run-4"))(validateOutput("Validating Run 4", finalExpectedGraph))
.sink(Tsv("progress"))(doNothing)
.run
.finish
}
def doNothing(buffer: mutable.Buffer[(Int, Int)]) {}
}
@ashwanthkumar
Copy link
Author

Reference - Page #167 on Big Data MEAP v18 (PDF) or Chapter 6.6 on ePub.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment