Skip to content

Instantly share code, notes, and snippets.

@asm0dey
Created March 30, 2020 09:47
Show Gist options
  • Save asm0dey/7720e08ebd637a3064542795ff915a44 to your computer and use it in GitHub Desktop.
Save asm0dey/7720e08ebd637a3064542795ff915a44 to your computer and use it in GitHub Desktop.
Reading ORC data not from files, but from input streams
plugins {
id 'org.jetbrains.kotlin.jvm' version '1.3.70'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.apache.orc:orc-core:1.6.2'
implementation 'org.apache.hadoop:hadoop-common:3.1.2'
implementation 'org.apache.hadoop:hadoop-hdfs-client:3.1.2'
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
}
compileKotlin {
kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}
import org.apache.orc.CompressionKind
import org.apache.orc.OrcProto
import org.apache.orc.OrcProto.*
import org.apache.orc.OrcUtils
import org.apache.orc.impl.BufferChunk
import org.apache.orc.impl.InStream
import org.apache.orc.impl.InStream.StreamOptions
import org.apache.orc.impl.OrcCodecPool
import java.io.FileInputStream
import java.io.InputStream
import java.nio.ByteBuffer
import java.nio.file.Paths
@ExperimentalUnsignedTypes
fun main() {
// OrcFile.createReader(Path("/home/finkel/output.orc"),OrcFile.ReaderOptions(Configuration()))
// .stripes
// .forEach { println("it.offset = ${it.offset}") }
val (postscript, postscriptLength) = readPostScript()
println("postscript.footerLength = ${postscript.footerLength}")
println("postscript.compression = ${postscript.compression}")
println("postscriptLength = $postscriptLength")
val footer = readFooter(postscript, postscriptLength)
// val reader = OrcFile.createReader(Path(file.absolutePath), OrcFile.ReaderOptions(Configuration(true)))
// println("reader.stripes = ${reader.stripes}")
println("footer?.stripesCount = ${footer?.stripesCount}")
println("footer?.stripesList = ${footer?.stripesList}")
}
private val OrcProto.CompressionKind.orc get() = CompressionKind.valueOf(name)
@ExperimentalUnsignedTypes
fun readFooter(
postscript: PostScript,
postscriptLength: Int
): Footer? {
val footerBytes =
readLastNBytesFromStream(1 + postscriptLength + postscript.metadataLength).use { it.readAllBytes() }
val codec = OrcCodecPool.getCodec(postscript.compression.orc)
return if (postscript.compression.orc != CompressionKind.NONE) {
val psOffset = footerBytes.size.toLong() - 1 - postscriptLength
val test = BufferChunk(ByteBuffer.wrap(footerBytes), 0)
Footer.parseFrom(
InStream.createCodedInputStream(
InStream.create(
"footer",
test,
psOffset - postscript.footerLength,
postscript.footerLength,
StreamOptions().withCodec(codec).withBufferSize(postscript.compressionBlockSize.toInt())
)
)
)
} else Footer.parseFrom(footerBytes)
}
private const val MAX_POSTSCRIPT_SIZE = 256
private fun readPostScript(): Pair<PostScript, Int> {
val last257Bytes = readLastNBytesFromStream(file.length() - (MAX_POSTSCRIPT_SIZE + 1)).use { it.readAllBytes() }
//last byte stores postscript length
val postscriptLength = last257Bytes.last().toInt()
println("postscriptLength = $postscriptLength")
//and it can't be encoded/compressed by specification
return PostScript.parseFrom(
last257Bytes.takeLast(postscriptLength + 1).dropLast(1).toByteArray()
) to postscriptLength
}
private fun readLastNBytesFromStream(n: Long): InputStream {
return inputStream.also { it.skip(n) }
}
var file = Paths.get("/home/finkel/output.orc").toFile()
val inputStream: FileInputStream get() = file.inputStream()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment