Created
May 5, 2017 14:58
-
-
Save animeshtrivedi/e7bdaeef37ab98d5abe69e8e60ef589f to your computer and use it in GitHub Desktop.
Crail benchmark tests as Spark program
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.ByteBuffer | |
import java.util.Random | |
import java.util.concurrent.Future | |
import com.ibm.crail._ | |
import com.ibm.crail.conf.CrailConfiguration | |
import com.ibm.crail.memory.OffHeapBuffer | |
object CrailTest extends Serializable { | |
private val testDir = "/test" | |
def createFile(numWriters:Int = 10, size:Long = 1048576L) : Unit = { | |
val crailConf = new CrailConfiguration() | |
val fs = CrailFS.newInstance(crailConf) | |
val testDirExists : Boolean = fs.lookup(testDir).get() != null | |
if(testDirExists) { | |
// we delete it and create it again | |
fs.delete(testDir, true).get().syncDir() | |
} | |
// on the driver we can create the test directory | |
fs.create(testDir, CrailNodeType.DIRECTORY, 0, 0).get().syncDir() | |
val execute = sc.parallelize(0 until numWriters, numWriters).foreachPartition( p => { | |
// | |
import org.apache.spark._ | |
val crailConf = new CrailConfiguration() | |
val fs = CrailFS.newInstance(crailConf) | |
// we have a file system in each worker | |
val id = SparkEnv.get.executorId | |
// file is created | |
val file = fs.create(testDir+"/"+id, CrailNodeType.DATAFILE, 0, 0).get().syncDir() | |
val output = file.asFile().getBufferedOutputStream(0) | |
// size is long, lets do it in multiple of 1MB | |
val arr = new Array[Byte](1024*1024) | |
val random = new Random(System.nanoTime()) | |
random.nextBytes(arr) | |
val bb = ByteBuffer.wrap(arr) | |
var dueSize = size | |
while (dueSize > 0) { | |
val curSize = Math.min(arr.length, dueSize).toInt | |
bb.clear() | |
bb.position(curSize) | |
bb.flip() | |
// bb is ready to be written | |
output.write(bb) | |
dueSize-=curSize | |
} | |
output.close() | |
fs.close() | |
} | |
) | |
} | |
def readFileHeap(numReaders:Int = 10, bufSize:Int = (1024*1024), preferDirect:Boolean = true) : Unit = { | |
val execute = sc.parallelize(0 until numReaders, numReaders).foreachPartition( p => { | |
// | |
import org.apache.spark._ | |
val crailConf = new CrailConfiguration() | |
val fs = CrailFS.newInstance(crailConf) | |
// we have a file system in each worker | |
val id = SparkEnv.get.executorId | |
// get the file | |
val file = fs.lookup(testDir+"/"+id).get().asFile() | |
val input = file.getBufferedInputStream(file.getCapacity()) | |
val bb = if(preferDirect) { | |
ByteBuffer.allocateDirect(bufSize) | |
} else { | |
ByteBuffer.allocate(bufSize) | |
} | |
var read = 0 | |
var ops:Long = 0 | |
while (read >= 0) { | |
bb.clear() | |
read = input.read(bb) | |
ops+=1 | |
} | |
System.err.println(" --------> read " + (testDir+"/"+id) + " in " + ops + " operations") | |
input.close() | |
fs.close() | |
} | |
) | |
} | |
def readFileDirect(numReaders:Int = 10, bufSize:Int = (1024*1024), asyncOps:Int = 1) : Unit = { | |
val execute = sc.parallelize(0 until numReaders, numReaders).foreachPartition( p => { | |
// | |
import org.apache.spark._ | |
val crailConf = new CrailConfiguration() | |
val fs = CrailFS.newInstance(crailConf) | |
// we have a file system in each worker | |
val id = SparkEnv.get.executorId | |
// get the file | |
val file = fs.lookup(testDir+"/"+id).get().asFile() | |
val input = file.getDirectInputStream(file.getCapacity()) | |
val bb = OffHeapBuffer.wrap(ByteBuffer.allocateDirect(bufSize)) | |
var read = 0L | |
var ops:Long = 0 | |
while (read >= 0) { | |
bb.clear() | |
val fx = input.read(bb) | |
if( fx == null) { | |
System.err.println(" ^^^^^^^ hit NULL ") | |
read = -1 | |
} else { | |
read = fx.get().getLen | |
} | |
ops+=1 | |
} | |
System.err.println(" --------> read " + (testDir+"/"+id) + " in " + ops + " operations") | |
input.close() | |
fs.close() | |
} | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment