Skip to content

Instantly share code, notes, and snippets.

@seykron
Created February 15, 2019 03:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seykron/a5f093f81f1f168d8a7004c393556100 to your computer and use it in GitHub Desktop.
Save seykron/a5f093f81f1f168d8a7004c393556100 to your computer and use it in GitHub Desktop.
package net.borak.support.csv
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.BufferedReader
class CSVParser {
companion object {
// Comma
private const val SEPARATOR: Char = 44.toChar()
private const val DOUBLE_QUOTE: Char = 34.toChar()
private const val ESCAPE: Char = 92.toChar()
private const val PARSER_JOBS: Int = 10
}
private val logger: Logger = LoggerFactory.getLogger(CSVParser::class.java)
fun parse(
csvReader: BufferedReader,
callback: (List<String>) -> Unit
) = runBlocking {
logger.info("parsing csv started")
val parserChannel = parseCsv(
reader = csvReader
)
(0 until PARSER_JOBS).map { index ->
parseRecordAsync(index, parserChannel, callback)
}.forEach { future ->
future.await()
}
logger.info("parsing csv finished")
}
private inline fun CoroutineScope.parseRecordAsync(
id: Int,
parserChannel: ReceiveChannel<String>,
crossinline callback: (List<String>) -> Unit
) = async(CoroutineName("CSV record parser $id")) {
var count = 0
logger.info("record parser listening for records")
for (line in parserChannel) {
callback(parseRecord(line))
count += 1
}
logger.info("record parser $id processed $count elements")
}
private fun parseRecord(rawRecord: String): List<String> {
var withinField = false
var escape = false
var startIndex = 0
val record: MutableList<String> = mutableListOf()
for (index in 0 until rawRecord.length) {
val char = rawRecord[index]
when {
!escape && !withinField && char == SEPARATOR -> {
record.add(rawRecord.slice(startIndex until index))
startIndex = index + 1
}
!escape && char == DOUBLE_QUOTE ->
withinField = !withinField
!escape && char == ESCAPE ->
escape = true
escape -> escape = false
}
}
return record
}
private fun CoroutineScope.parseCsv(reader: BufferedReader) = produce(Dispatchers.IO) {
var size: Long = 0
var count = 0
logger.info("csv line parser ready for sending records")
while(true) {
val line = reader.readLine()
if (line != null) {
size += line.length
count += 1
send(line)
} else {
logger.info("Items: {}, size: {}", count, size)
break
}
}
}
}
package net.borak.support.csv
import org.junit.Test
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.File
import java.nio.charset.Charset
class CSVParserTest {
companion object {
private const val BUFFER_SIZE: Int = 1024 * 1024 * 150
}
private val logger: Logger = LoggerFactory.getLogger(CSVParserTest::class.java)
private val source: File = File("huge.csv")
@Test
fun parse() {
val reader = source.bufferedReader(
charset = Charset.defaultCharset(),
bufferSize = BUFFER_SIZE
)
CSVParser().parse(
csvReader = reader
) { record ->
//logger.info(record.joinToString(" AND "))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment