Skip to content

Instantly share code, notes, and snippets.

@hminle
Last active December 8, 2016 00:55
Show Gist options
  • Save hminle/60eb01011211f91ced2291168c800d5f to your computer and use it in GitHub Desktop.
Save hminle/60eb01011211f91ced2291168c800d5f to your computer and use it in GitHub Desktop.
Utils.scala
package externalsorting
import java.io.{BufferedOutputStream, File, FileOutputStream}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.Paths
import readInput._
import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer
/**
* Created by hminle on 12/5/2016.
*/
object Utils {
def getListOfFiles(dir: String): List[File] = {
val d = new File(dir)
if(d.exists() && d.isDirectory){
d.listFiles.filter(_.isFile).toList
} else List[File]()
}
def get100BytesKeyAndValue(fileChannel: FileChannel): Option[(Key, Value)] = {
val size = 100
val buffer = ByteBuffer.allocate(size)
buffer.clear()
val numOfByteRead = fileChannel.read(buffer)
buffer.flip()
if(numOfByteRead != -1){
val data: Array[Byte] = new Array[Byte](numOfByteRead)
buffer.get(data, 0, numOfByteRead)
val (key, value) = data.splitAt(10)
Some(Key(key.toList), Value(value.toList))
} else {
None
}
}
def getFileChannelFromInput(file: File): FileChannel = {
val fileChannel: FileChannel = FileChannel.open(Paths.get(file.getPath))
fileChannel
}
def estimateAvailableMemory(): Long = {
System.gc()
val runtime: Runtime = Runtime.getRuntime
val allocatedMemory: Long = runtime.totalMemory() - runtime.freeMemory()
val presFreeMemory: Long = runtime.maxMemory() - allocatedMemory
presFreeMemory
}
def writePartition(dir: String, keyValue: List[(Key, Value)]): Unit = {
val byteArray: Array[Byte] = flattenKeyValueList(keyValue).toArray[Byte]
val bos = new BufferedOutputStream(new FileOutputStream(dir))
Stream.continually(bos.write(byteArray))
bos.close()
}
def flattenKeyValueList(keyValue: List[(Key,Value)]): List[Byte] = {
keyValue flatten {
case (Key(keys), Value(values)) => keys:::values
}
}
def flattenKeyValue(keyVal: (Key, Value)): List[Byte] = {
keyVal._1.keys:::keyVal._2.values
}
def getChunkKeyAndValueBySize(size: Int, fileChannel: FileChannel): (List[(Key, Value)], Boolean) = {
val oneKeyValueSize = 100
val countMax = size / oneKeyValueSize
var isEndOfFileChannel: Boolean = false
var count = 0
val chunks: ListBuffer[(Key, Value)] = ListBuffer.empty
do{
val keyValue = get100BytesKeyAndValue(fileChannel)
if(keyValue.isDefined) chunks.append(keyValue.get)
isEndOfFileChannel = !keyValue.isDefined
count += 1
}while(!isEndOfFileChannel && count < countMax)
(chunks.toList, isEndOfFileChannel)
}
def getSortedChunk(oneChunk: List[(Key, Value)]): List[(Key, Value)] = {
oneChunk.sortWith((_._1 < _._1))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment