Skip to content

Instantly share code, notes, and snippets.

@animeshtrivedi
Created May 4, 2017 11:10
Show Gist options
  • Save animeshtrivedi/c3cdfc5d022c38f90d75430cf5f5f397 to your computer and use it in GitHub Desktop.
Save animeshtrivedi/c3cdfc5d022c38f90d75430cf5f5f397 to your computer and use it in GitHub Desktop.
A broadcast stress test for Spark.
/*
* Broadcast stress test for Spark
*
* Author: Animesh Trivedi <atr@zurich.ibm.com>
*
* Copyright (C) 2017, IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import org.apache.spark.broadcast.Broadcast
import scala.util.Random
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.io._
import java.io.File
import java.io.FileOutputStream
import java.lang.Long
object BroadcastTest {
def test(tasks: Int = 10, num: Int = 10000, size:Int = 4096 , itr:Int = 4) {
val random = new Random()
val sb = StringBuilder.newBuilder
// values to broadcast
val toBroadcastArr = new Array[Array[Byte]](num)
// these are variables (Array[Byte]) that will be broadcast'ed
for( a <- 0 until num){
toBroadcastArr(a) = new Array[Byte](size) // 4k
random.nextBytes(toBroadcastArr(a))
}
val accuLong = sc.longAccumulator("counter")
val broadcastVariableArr = new Array[Broadcast[Array[Byte]]](num)
var sumRunTime = 0L
var sumWriteTime = 0L
val writeTimeStamps = new Array[Long](num * itr)
for (i <- 0 until itr) {
println("=========== iteration " + i)
val startTime = System.nanoTime
for( a <- 0 until num){
val s = System.nanoTime()
// do all the broadcasts
broadcastVariableArr(a) = sc.broadcast(toBroadcastArr(a))
writeTimeStamps((i * num) + a ) = (System.nanoTime() - s)
}
sumWriteTime +=(System.nanoTime - startTime)
val triggetComputer = sc.parallelize(1 to tasks, tasks).map(_ => {
// for each partition we need to read all broadcast variable
var sizeSum = 0L
for (a <- 0 until num) {
// sum up their length
sizeSum+=(broadcastVariableArr(a).value.length)
}
(accuLong.add(sizeSum))
}).count
val runX = System.nanoTime - startTime
sumRunTime+=runX
sb.append(s"Iteration %d took %.0f milliseconds, value %d \n".format(i, (runX) / 1E6, accuLong.value))
accuLong.reset()
}
println(sb.mkString)
println(" on average it takes : " + (sumRunTime / (1000 * itr * num)) +
" usecs, writeBroadcast is around : " + (sumWriteTime / (1000 * itr * num)))
// write out the write latency profile
val file = new File("./latencies.data")
val bw = new BufferedWriter(new FileWriter(file))
for ( lat <- 0 until (num * itr)) {
bw.write( lat + " " + writeTimeStamps(lat)/1000 + " usec \n")
}
bw.close()
println("wrote the latency file as well ")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment