Skip to content

Instantly share code, notes, and snippets.

@cmcbride
Created September 29, 2015 19:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cmcbride/7b8551400da179f24345 to your computer and use it in GitHub Desktop.
Save cmcbride/7b8551400da179f24345 to your computer and use it in GitHub Desktop.
Test Implementation of Conjugate Gradient for Linear Problem
import org.apache.spark.mllib.linalg.{Vector, Vectors, Matrices}
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
// Ensure a properly ordered local vector from a distributed RDD
// (Int,Double) corresponds to (index, value) where index is the proper order
def array_from_rdd( rdd: RDD[(Int,Double)] ) : Array[Double] =
rdd.sortBy(_._1).map(_._2).toArray
def vec_from_rdd( rdd: RDD[(Int,Double)] ) =
Vectors.dense( array_from_rdd( rdd ) )
def dot_irv( ir: IndexedRow, v2: Vector ) : Double = {
val vs1 = ir.vector.toSparse;
if (vs1.indices.size < 1) {
0.0
} else {
vs1.indices.foldLeft(0.0)((m,i) => m + vs1(i) * v2(i))
}
}
def sqr_rdd( vd: RDD[(Int,Double)] ) : Double =
vd.map( e => math.pow(e._2, 2) ).reduce( _ + _ )
// rdd-vector dot product
def dot_rv( rdd : RDD[(Int, Double)], vec : Vector ) : Double =
rdd.map( e => vec(e._1) * e._2 ).reduce( _ + _ )
def conjgrad( mat:IndexedRowMatrix, b:Vector, x:Vector, max_iter:Int ) = {
val MaxIter = max_iter
val Tol = 1e-5
val n = x.size
var xa = x.toArray
// produce a distributed RDD from the matrix-vector multiplication
// result stored as tuple of RDD[(Int, Double)] = (index, value)
var rd = mat.rows.map( ir => (ir.index.toInt, b(ir.index.toInt) - dot_irv(ir, x))).cache
var ra = array_from_rdd( rd ) // make local
var p = vec_from_rdd( rd ) // make another local copy
var rs0 = sqr_rdd( rd )
var rs1 = rs0
var niter = 0
var converged = false
println("Starting conjugate gradient: %d elements (max_iter = %d)".format(n,MaxIter))
val t0 = System.currentTimeMillis
while( !converged && niter < MaxIter ) {
niter += 1
val qd = mat.rows.map( ir => (ir.index.toInt, dot_irv(ir, p) ) ).cache
val q = vec_from_rdd( qd ) // keep q a vector as it's immutable
val alpha = rs0 / dot_rv( qd, p )
rs1 = 0.0
for(i <- 0 until n) {
xa(i) += alpha * p(i)
ra(i) -= alpha * q(i)
rs1 += ra(i) * ra(i)
}
val ttol = math.sqrt(rs1)
if(niter % 10 == 0) {
val tt = (System.currentTimeMillis - t0) * 1e-3
println(s" iteration %d : resid = %e (%e tolerance) : %.3f s per iteration".format(niter, ttol, Tol, tt / niter) )
}
if(ttol < Tol) {
converged = true
}
val rat = rs1 / rs0
// this updates the p vector!
val pa = p.toArray
for(i <- 0 until n) {
pa(i) = pa(i) * rat + ra(i)
}
rs0 = rs1
}
println(s"Done! Converged: $converged")
val tt = (System.currentTimeMillis - t0) * 1e-3
println("%d iterations in %.1f seconds : avg of %.3f s per iteration".format(niter, tt, tt/niter))
Vectors.dense(xa)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment