Skip to content

Instantly share code, notes, and snippets.

@johnynek
Created December 1, 2012 22:17
Show Gist options
  • Save johnynek/4185596 to your computer and use it in GitHub Desktop.
Save johnynek/4185596 to your computer and use it in GitHub Desktop.
Sketch of scala graph algorithm API
// run with: scala Graph.scala
// Should print a bunch of crap.
// Take away:
// Simple API can easily express: PageRank, ConnectedComponents, Cosine/Jaccard similarity
// See examples below:
import scala.annotation.tailrec
import scala.collection.parallel.immutable.ParIterable
abstract class Graph[G<:Graph[_,N],N](m: Map[N,(Set[N],Set[N])]) {
// Abstract methods:
def +(n: N): G
def +(fromTo: (N,N)): G
def reverse: G
def size: Int = m.size
def nodes: Iterator[N] = m.iterator.map { _._1 }
def nodesPar: ParIterable[N] =m.par.map { _._1 }
def contains(n: N): Boolean = m.contains(n)
def outEdges(n: N): Set[N] = m.get(n).map { _._1 }.getOrElse(Set[N]())
def outDegree(n: N): Int = outEdges(n).size
def inEdges(n: N): Set[N] = m.get(n).map { _._2 }.getOrElse(Set[N]())
def inDegree(n: N): Int = inEdges(n).size
override def toString =
m.flatMap { ns =>
ns._2._1.map { n2 => ns._1.toString + " -> " + n2.toString }
}.mkString("\n")
}
class GraphWithState[N,S] private (m: Map[N,(Set[N],Set[N])], s: Map[N,S])
extends Graph[GraphWithState[N,S], N](m) {
override def +(n: N) = {
if(m.contains(n)) this
else new GraphWithState[N,S](m + (n -> (Set[N](), Set[N]())), s)
}
override def +(fromTo: (N,N)) = {
val (src, dst) = fromTo
val newM = (m + (src -> ((outEdges(src) + dst), inEdges(src)))) +
(dst -> (outEdges(dst), (inEdges(dst) + src)))
new GraphWithState(newM, s)
}
override def reverse = new GraphWithState(m.map { ne => ne._1 -> (ne._2._2, ne._2._1)},s)
def state(n: N): S = s(n)
def setState(ns: (N,S)): GraphWithState[N,S] =
new GraphWithState(m, s + ns)
def setState[S2](st: (N) => S2): GraphWithState[N,S2] =
new GraphWithState(m, nodes.map { n => (n, st(n)) }.toMap)
override def toString =
m.flatMap { ns =>
ns._2._1.map { n2 =>
ns._1.toString + " -> " + n2.toString
}.iterator ++ Iterator("(" + ns._1 + ")=" + state(ns._1))
}.mkString("\n")
}
object GraphWithState {
implicit def fromEdgeList[N,Unit](m: Iterable[(N,N)]): GraphWithState[N,Unit] = {
m.foldLeft(empty[N,Unit]) { (g, pair) => g + pair }
}
def empty[N,S] =
new GraphWithState[N,S](Map.empty[N,(Set[N],Set[N])], Map.empty[N,S])
def algoStep[N,S](graph: GraphWithState[N,S])
(fn: (N, S, Iterator[(N,S)], Iterator[(N,S)]) => Option[S]):
(Boolean, GraphWithState[N,S]) = {
/* This is a clearly a map-reduce calculation, so we could easily
* write a general scalding job that represents this same approach
*/
graph.nodesPar.map { node =>
(node, fn(node,
graph.state(node),
graph.inEdges(node).iterator.map { n => (n, graph.state(n)) },
graph.outEdges(node).iterator.map { n => (n, graph.state(n)) }))
}.foldLeft((false, graph)) { (result, nodeOpt) =>
val (node, newStateO) = nodeOpt
newStateO.map { news => (true, result._2.setState(node -> news)) }
.getOrElse(result)
}
}
@tailrec
def algo[N,S](graph: GraphWithState[N,S])
(fn: (N, S, Iterator[(N,S)], Iterator[(N,S)]) => Option[S]): GraphWithState[N,S] = {
val (changed, newG) = algoStep(graph)(fn)
if (changed) algo(newG)(fn) else newG
}
//////////////////////////////////
// Here are some actual implementation of algorithms:
//////////////////////////////////
def pageRank[N](graph: GraphWithState[N,_]): GraphWithState[N,(Double,Int)] = {
val jumpProb = 0.1
algo(graph.setState { n => (1.0, graph.outDegree(n)) }) {
(node, prevMass, in, _) =>
val newMass = in.map { case (n,s) => s._1 / s._2 }.sum * (1.0 - jumpProb) + jumpProb
if (scala.math.abs(prevMass._1 - newMass) < 0.0001) None
else Some((newMass, prevMass._2))
}
}
def components[N:Ordering](graph: GraphWithState[N,_]): GraphWithState[N,N] = {
val ord = implicitly[Ordering[N]]
algo(graph.setState { n => n }) {
(node, minNode, ins, outs) =>
Some((ins++outs).foldLeft(minNode) { (oldMin, ns) => ord.min(oldMin, ns._2) })
.filter { _ != minNode }
}
}
def cosineOut(graph: GraphWithState[Int,_]): GraphWithState[Int,CosineStateMachine] = {
algo[Int,CosineStateMachine](graph.setState { _ => InitState }) { (node, ns, ins, outs) =>
ns.next(node, ins, outs)
}
}
}
// cosine(i,j) = <i,j>/sqrt(<i,i><j,j>)
sealed abstract class CosineStateMachine {
def next(id: Int, in: Iterator[(Int,CosineStateMachine)],
out: Iterator[(Int,CosineStateMachine)]) : Option[CosineStateMachine]
}
object InitState extends CosineStateMachine {
override def next(id: Int, in: Iterator[(Int,CosineStateMachine)],
out: Iterator[(Int,CosineStateMachine)]) =
Some(InitialState(out.size))
}
case class InitialState(outdegree: Int) extends CosineStateMachine {
// read all your in-neighbors (id, out-degree) keep a list.
override def next(id: Int, in: Iterator[(Int,CosineStateMachine)],
out: Iterator[(Int,CosineStateMachine)]) = {
// Make sure the states are all initial:
val insIter = in.map { case (id, csm) =>
csm match {
case is@InitialState(_) => (id, is)
case _ => sys.error("Invalid state")
}
}.toList
Some(Step1State(this, insIter))
}
}
/*
* This is the heavy lift step
*/
case class Step1State(initState: InitialState, insInit: Iterable[(Int, InitialState)])
extends CosineStateMachine {
// out-degrees state now holds the list of people that point to them:
override def next(id: Int, in: Iterator[(Int,CosineStateMachine)],
out: Iterator[(Int,CosineStateMachine)]) = {
// Each out-degree that holds one overlap between us and all their in-degrees:
val result = out.flatMap { idCsm => idCsm._2 match {
case Step1State(_, insInit) => {
insInit.map { case (id, InitialState(outdeg)) => (id, outdeg) }
}
case _ => sys.error("Invalid state after step1")
}
}
.toList
.groupBy { _._1 }
.mapValues { iddegs =>
// Keep the degree, should all be the same, but add up the number of
(iddegs.head._2, iddegs.size)
}
Some(Step2State(initState, result))
}
}
case class Step2State(initState: InitialState, m: Map[Int, (Int, Int)]) extends
CosineStateMachine {
// Last step:
override def next(id: Int, in: Iterator[(Int,CosineStateMachine)],
out: Iterator[(Int,CosineStateMachine)]) = None
def cosine(otherId: Int): Double = {
m.get(otherId).map { case (degree, overlap) =>
overlap.toDouble / (scala.math.sqrt(degree) * scala.math.sqrt(initState.outdegree))
}.getOrElse(0.0)
}
}
import GraphWithState._
val g = (empty[Int,Double] + (1 -> 2) + (2 -> 3) + (3 -> 1) + (2 -> 1)).setState(_ => 0.0)
println("===========")
println(g)
println("===========")
println(g.reverse)
println("===========")
println(pageRank(g))
println("===========")
println(components(List(1 -> 2, 3 -> 2, 4 -> 0)))
println("===========")
val cs = cosineOut(List(1 -> 2, 3 -> 2, 1 -> 4))
println(cs)
println(cs.state(1).asInstanceOf[Step2State].cosine(3))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment