Skip to content

Instantly share code, notes, and snippets.

@seykron
Created February 13, 2019 04:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seykron/9edc50762a164a83f5221ff75ea7bcd3 to your computer and use it in GitHub Desktop.
Save seykron/9edc50762a164a83f5221ff75ea7bcd3 to your computer and use it in GitHub Desktop.
package net.borak.support.csv
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.runBlocking
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.BufferedReader
import java.io.File
import java.nio.charset.Charset
class CSVParser {
companion object {
// Read buffer size. 150MB
private const val BUFFER_SIZE: Int = 1024 * 1024 * 150
private const val PARSER_JOBS: Int = 10
}
private val logger: Logger = LoggerFactory.getLogger(CSVParser::class.java)
fun parse(csvFile: File) = runBlocking {
logger.info("parsing started")
val parserChannel = loadRecords(
reader = csvFile.bufferedReader(
charset = Charset.defaultCharset(),
bufferSize = BUFFER_SIZE
)
)
(0 until PARSER_JOBS).map { index ->
parseRecord(index, parserChannel)
}.forEach { future ->
future.await()
}
logger.info("parsing finished")
}
private fun CoroutineScope.parseRecord(id: Int,
parserChannel: ReceiveChannel<String>) = async {
var count = 0
for (line in parserChannel) {
count += 1
}
logger.info("Parser $id has $count elements")
}
private fun CoroutineScope.loadRecords(reader: BufferedReader) = produce(Dispatchers.IO) {
var size: Long = 0
var count = 0
while(true) {
val line = reader.readLine()
if (line != null) {
size += line.length
count += 1
send(line)
} else {
logger.info("Items: {}, size: {}", count, size)
break
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment