Skip to content

Instantly share code, notes, and snippets.

@animeshtrivedi
Created May 5, 2017 14:58
Show Gist options
  • Save animeshtrivedi/e7bdaeef37ab98d5abe69e8e60ef589f to your computer and use it in GitHub Desktop.
Save animeshtrivedi/e7bdaeef37ab98d5abe69e8e60ef589f to your computer and use it in GitHub Desktop.
Crail benchmark tests as Spark program
import java.nio.ByteBuffer
import java.util.Random
import java.util.concurrent.Future
import com.ibm.crail._
import com.ibm.crail.conf.CrailConfiguration
import com.ibm.crail.memory.OffHeapBuffer
object CrailTest extends Serializable {
private val testDir = "/test"
def createFile(numWriters:Int = 10, size:Long = 1048576L) : Unit = {
val crailConf = new CrailConfiguration()
val fs = CrailFS.newInstance(crailConf)
val testDirExists : Boolean = fs.lookup(testDir).get() != null
if(testDirExists) {
// we delete it and create it again
fs.delete(testDir, true).get().syncDir()
}
// on the driver we can create the test directory
fs.create(testDir, CrailNodeType.DIRECTORY, 0, 0).get().syncDir()
val execute = sc.parallelize(0 until numWriters, numWriters).foreachPartition( p => {
//
import org.apache.spark._
val crailConf = new CrailConfiguration()
val fs = CrailFS.newInstance(crailConf)
// we have a file system in each worker
val id = SparkEnv.get.executorId
// file is created
val file = fs.create(testDir+"/"+id, CrailNodeType.DATAFILE, 0, 0).get().syncDir()
val output = file.asFile().getBufferedOutputStream(0)
// size is long, lets do it in multiple of 1MB
val arr = new Array[Byte](1024*1024)
val random = new Random(System.nanoTime())
random.nextBytes(arr)
val bb = ByteBuffer.wrap(arr)
var dueSize = size
while (dueSize > 0) {
val curSize = Math.min(arr.length, dueSize).toInt
bb.clear()
bb.position(curSize)
bb.flip()
// bb is ready to be written
output.write(bb)
dueSize-=curSize
}
output.close()
fs.close()
}
)
}
def readFileHeap(numReaders:Int = 10, bufSize:Int = (1024*1024), preferDirect:Boolean = true) : Unit = {
val execute = sc.parallelize(0 until numReaders, numReaders).foreachPartition( p => {
//
import org.apache.spark._
val crailConf = new CrailConfiguration()
val fs = CrailFS.newInstance(crailConf)
// we have a file system in each worker
val id = SparkEnv.get.executorId
// get the file
val file = fs.lookup(testDir+"/"+id).get().asFile()
val input = file.getBufferedInputStream(file.getCapacity())
val bb = if(preferDirect) {
ByteBuffer.allocateDirect(bufSize)
} else {
ByteBuffer.allocate(bufSize)
}
var read = 0
var ops:Long = 0
while (read >= 0) {
bb.clear()
read = input.read(bb)
ops+=1
}
System.err.println(" --------> read " + (testDir+"/"+id) + " in " + ops + " operations")
input.close()
fs.close()
}
)
}
def readFileDirect(numReaders:Int = 10, bufSize:Int = (1024*1024), asyncOps:Int = 1) : Unit = {
val execute = sc.parallelize(0 until numReaders, numReaders).foreachPartition( p => {
//
import org.apache.spark._
val crailConf = new CrailConfiguration()
val fs = CrailFS.newInstance(crailConf)
// we have a file system in each worker
val id = SparkEnv.get.executorId
// get the file
val file = fs.lookup(testDir+"/"+id).get().asFile()
val input = file.getDirectInputStream(file.getCapacity())
val bb = OffHeapBuffer.wrap(ByteBuffer.allocateDirect(bufSize))
var read = 0L
var ops:Long = 0
while (read >= 0) {
bb.clear()
val fx = input.read(bb)
if( fx == null) {
System.err.println(" ^^^^^^^ hit NULL ")
read = -1
} else {
read = fx.get().getLen
}
ops+=1
}
System.err.println(" --------> read " + (testDir+"/"+id) + " in " + ops + " operations")
input.close()
fs.close()
}
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment