Skip to content

Instantly share code, notes, and snippets.

@brewkode
Last active August 29, 2015 14:14
Show Gist options
  • Save brewkode/1cd1dca062cddc7be1c2 to your computer and use it in GitHub Desktop.
Save brewkode/1cd1dca062cddc7be1c2 to your computer and use it in GitHub Desktop.
object UnionFind {
def undirectedEdge(tuple: (String, String)) = tuple :: tuple.swap :: Nil
def findRepresentative(it: Iterator[(String, String)]) = {
val (existingRepr, newReprCandidate) = it.next()
val newRepr = if(existingRepr.compareTo(newReprCandidate) <= 0) existingRepr else newReprCandidate
val updated = it.nonEmpty && (existingRepr != newRepr)
val items = (Iterator(existingRepr, newReprCandidate) ++ it.map(_._2)).filter(_ != newRepr)
items.map(k => (newRepr, k, updated))
}
def unionFindStep(stat: Stat, pairs: TypedPipe[(String, String)]) = {
val result = formNewRepresentatives(pairs)
.map{t3 =>
if(t3._3) stat.inc
t3
}
.debug
.map(t3 => (t3._1, t3._2))
.distinct
.forceToDiskExecution
result
}
def unionFind(pairs: TypedPipe[(String, String)]) = {
val key = StatKey("new-representatives", "unionfind")
def execute(stat: Stat, pairs: TypedPipe[(String, String)], iteration: Int): Execution[(Int, TypedPipe[(String, String)])] = {
println(s"Running iteration $iteration")
unionFindStep(stat, pairs)
.getAndResetCounters
.flatMap{
case (newPairs, counters) =>
println(s"Completed iteration $iteration, # newUpdates: ${counters(key)}")
val newUpdatesAvailable = counters(key)
if (newUpdatesAvailable == 0L) Execution.from(iteration, newPairs)
else execute(stat, newPairs, iteration+1)
}
}
Execution.withId{ implicit uid =>
execute(Stat(key), pairs, 1)
}
}
def formNewRepresentatives(pairs: TypedPipe[(String,String)]) = {
pairs
.flatMap(undirectedEdge)
.groupBy(_._1).sortBy(_._2)
.mapValueStream(values => findRepresentative(values))
.values
}
}
@brewkode
Copy link
Author

brewkode commented Feb 4, 2015

Updated working code:

def unionFindStep(stat: Stat, pairs: TypedPipe[(String, String)]) = {
    val result = formNewRepresentatives(pairs)
      .map{t3 =>
        if(t3._3) stat.inc
        t3
      }
      .forceToDiskExecution

    val hasUpdates = result.flatMap{ pipe =>
      pipe
        .filter(_._3)
        .map(_ => 1L)
        .sum[Long]
        .toOptionExecution
        .map(_.getOrElse(0L))
    }

    val distinctPairs = result.map(pipe => pipe.map(t3 => (t3._1, t3._2)).distinct)
    Execution.zip(hasUpdates, distinctPairs)
  }

  def unionFind(pairs: TypedPipe[(String, String)]) = {
    val key = StatKey("new-representatives", "unionfind")

    def execute(stat: Stat, pairs: TypedPipe[(String, String)], iteration: Int): Execution[(Int, TypedPipe[(String, String)])] = {
      println(s"Running iteration $iteration")
      unionFindStep(stat, pairs)
        .getAndResetCounters
        .flatMap{
          case ((hasUpdates, newPairs), counters) =>
            println(s"Completed iteration $iteration, # newUpdates: ${hasUpdates}")
            if (hasUpdates == 0L) Execution.from(iteration, newPairs)
            else execute(stat, newPairs, iteration+1)
        }
    }

    Execution.withId{ implicit uid =>
      execute(Stat(key), pairs, 1)
    }
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment