Skip to content

Instantly share code, notes, and snippets.

@umbertogriffo
Last active April 13, 2017 09:33
Show Gist options
  • Save umbertogriffo/080a18a7f81346e57bd2aecded247f4d to your computer and use it in GitHub Desktop.
Save umbertogriffo/080a18a7f81346e57bd2aecded247f4d to your computer and use it in GitHub Desktop.
This Scala code tests the performance of Euclidean distance developed using map-reduce pattern, treeReduce and treeAggregate.
import org.apache.commons.lang.SystemUtils
import org.apache.spark.mllib.random.RandomRDDs._
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import scala.math.sqrt
/**
* Created by Umberto on 08/02/2017.
*/
object TestPerformance {
def main(args: Array[String]) {
var mapReduceTimeArr : Array[Double]= Array.ofDim(20)
var treeReduceTimeArr : Array[Double]= Array.ofDim(20)
var treeAggregateTimeArr : Array[Double]= Array.ofDim(20)
// Set Windows System property
if (SystemUtils.IS_OS_WINDOWS) {
System.setProperty("hadoop.home.dir", "c:/winutil/")
}
// Spark setup
val config = new SparkConf().setAppName("TestStack").setMaster("local[*]")
val sc: SparkContext = new SparkContext(config)
val sql: SQLContext = new SQLContext(sc)
// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
// standard normal distribution `N(0, 1)`, evenly distributed in 5 partitions.
val input1 = normalRDD(sc, 1000000L, 5)
// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
// standard normal distribution `N(0, 1)`, evenly distributed in 5 partitions.
val input2 = normalRDD(sc, 1000000L, 5)
//val input1 = sc.parallelize(List[Double](1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
//val input2 = sc.parallelize(List[Double](1.0, 2.0, 3.0, 4.0, 3.0, 7.0))
val xy = input1.zip(input2).cache()
// To materialize th RDD
xy.count()
for(i:Int <-0 until 20){
val t1 = System.nanoTime()
val euclideanDistanceMapRed = sqrt(xy.map { case (v1, v2) => (v1 - v2) * (v1 - v2) }.reduce(_ + _))
val t11 = System.nanoTime()
println("Map-Reduce - Euclidean Distance "+euclideanDistanceMapRed)
mapReduceTimeArr(i)=(t11 - t1)/1000000.0
println("Map-Reduce - Elapsed time: " + (t11 - t1)/1000000.0 + "ms")
}
for(i:Int <-0 until 20) {
val t2 = System.nanoTime()
val euclideanDistanceTreeRed = sqrt(xy.map { case (v1, v2) => (v1 - v2) * (v1 - v2) }.treeReduce(_ + _))
val t22 = System.nanoTime()
println("TreeReduce - Euclidean Distance "+euclideanDistanceTreeRed)
treeReduceTimeArr(i)=(t22 - t2) / 1000000.0
println("TreeReduce - Elapsed time: " + (t22 - t2) / 1000000.0 + "ms")
}
for(i:Int <-0 until 20) {
val t3 = System.nanoTime()
val euclideanDistanceTreeAggr = sqrt(xy.treeAggregate(0.0)(
seqOp = (c, v) => {
(c + ((v._1 - v._2) * (v._1 - v._2)))
},
combOp = (c1, c2) => {
(c1 + c2)
}))
val t33 = System.nanoTime()
println("TreeAggregate - Euclidean Distance " + euclideanDistanceTreeAggr)
treeAggregateTimeArr(i) = (t33 - t3) / 1000000.0
println("TreeAggregate - Elapsed time: " + (t33 - t3) / 1000000.0 + "ms")
}
val mapReduceAvgTime = mapReduceTimeArr.sum / mapReduceTimeArr.length
val treeReduceAvgTime = treeReduceTimeArr.sum / treeReduceTimeArr.length
val treeAggregateAvgTime = treeAggregateTimeArr.sum / treeAggregateTimeArr.length
val mapReduceMinTime = mapReduceTimeArr.min
val treeReduceMinTime = treeReduceTimeArr.min
val treeAggregateMinTime = treeAggregateTimeArr.min
val mapReduceMaxTime = mapReduceTimeArr.max
val treeReduceMaxTime = treeReduceTimeArr.max
val treeAggregateMaxTime = treeAggregateTimeArr.max
println("Map-Reduce - Avg:" + mapReduceAvgTime+ "ms "+ "Max:" +mapReduceMaxTime+ "ms "+ "Min:" +mapReduceMinTime+ "ms ")
println("TreeReduce - Avg:" + treeReduceAvgTime + "ms "+ "Max:" +treeReduceMaxTime+ "ms "+ "Min:" +treeReduceMinTime+ "ms ")
println("TreeAggregate - Avg:" + treeAggregateAvgTime + "ms "+ "Max:" +treeAggregateMaxTime+ "ms "+ "Min:" +treeAggregateMinTime+ "ms ")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment