Skip to content

Instantly share code, notes, and snippets.

@isyufu
Last active January 26, 2022 10:00
Show Gist options
  • Save isyufu/98ea46b94bd16a0b59db584734204d27 to your computer and use it in GitHub Desktop.
Save isyufu/98ea46b94bd16a0b59db584734204d27 to your computer and use it in GitHub Desktop.
Read write parqet file in kotlin\java
dependencies {
...
implementation 'org.apache.parquet:parquet-avro:1.10.0'
implementation 'org.apache.hadoop:hadoop-common:3.1.0'
implementation 'io.github.serpro69:kotlin-faker:1.9.0'
...
}
package com.my.helpers
import io.github.serpro69.kfaker.faker
import mu.KLogging
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.reflect.ReflectData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE
import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP
import org.apache.parquet.hadoop.util.HadoopInputFile
import kotlin.random.Random
import org.apache.hadoop.conf.Configuration as HConf
import org.apache.hadoop.fs.Path as HfsPath
object ParquetUtils : KLogging() {
fun readParquet(path: String): Unit {
val reader = ParquetFileReader.open(HadoopInputFile.fromPath(HfsPath(path), HConf()))
val schema = reader.footer.fileMetaData.schema
logger.debug { "schema: $schema" }
val fields = schema.fields.map { it.name }
generateSequence { reader.readNextRowGroup() }
.takeWhile { it != null }
.map { pages ->
val columnIO = org.apache.parquet.io.ColumnIOFactory().getColumnIO(schema)
val recordReader = columnIO.getRecordReader(pages, org.apache.parquet.example.data.simple.convert.GroupRecordConverter(schema))
(0 until pages.rowCount).forEach { _ ->
val simpleGroup = recordReader.read() as org.apache.parquet.example.data.simple.SimpleGroup
fields.forEach { field ->
val fieldIdx = simpleGroup.type.getFieldIndex(field)
(0 until simpleGroup.getFieldRepetitionCount(fieldIdx)).forEach { idx ->
val value = simpleGroup.getValueToString(fieldIdx, idx)
logger.info { "$field $value" }
}
}
}
}.also { System.gc() }
printMemUsage("readParquet close reader i: $i")
reader.close()
System.gc()
printMemUsage("readParquet after close reader and gc")
}
/**
* demo write parquet file
*/
fun writeFakeParquet(path: String, totalColumns: Int = 100, totalRows: Int = 1000000, doubleColumns:Int = 20) {
val faker = faker { }
val rnd = Random(System.currentTimeMillis())
val conf = HConf()
val dataFile = HfsPath(path)
val sch = SchemaBuilder.record("data").doc("random").namespace("xxx").fields().let { sbfa ->
var x = sbfa
(0 until totalColumns).forEach { i ->
x = x.name("field_$i").type().stringType().noDefault()
}
(0 until doubleColumns).forEach { i ->
x = x.name("field_${i}_d").type().doubleType().noDefault()
}
x
}.endRecord()
logger.info { "schema: " + sch }
logger.info { "writer start" }
AvroParquetWriter.builder<GenericRecord>(dataFile)
// .withSchema(ReflectData.AllowNull.get().getSchema(typeOf<MyCls>().javaType)) // generate nullable fields
.withSchema(sch) // generate nullable fields
.withDataModel(ReflectData.get())
.withRowGroupSize(64 * 1024 * 1024)// default ~ 128 000 000 b, 1mb ~ 560 records
.withConf(conf)
.withCompressionCodec(GZIP) //allocated memory: 52510101
// .withCompressionCodec(UNCOMPRESSED)//allocated memory: 81675633 56 mb
.withWriteMode(OVERWRITE)
.build().use { writer ->
(0 until totalRows).forEach { _ ->
writer.write(
GenericData.Record(sch).let { rec ->
(0 until totalColumns).forEach { i ->
when (i) {
in 0..9 -> rec.put("field_$i", faker.address.fullAddress())
in 10..19 -> rec.put("field_$i", faker.bank.name())
in 20..29 -> rec.put("field_$i", faker.animal.name())
in 30..39 -> rec.put("field_$i", faker.beer.name())
in 40..49 -> rec.put("field_$i", faker.blood.group())
in 50..59 -> rec.put("field_$i", faker.book.title())
in 60..69 -> rec.put("field_$i", faker.coin.flip())
else -> rec.put("field_$i", faker.cat.name())
}
}
(0 until doubleColumns).forEach { i ->
rec.put("field_${i}_d", rnd.nextDouble())
}
rec
}
)
}
}
printMemUsage("writeFakeParquet before gc")
System.gc()
printMemUsage("writeFakeParquet after gc")
logger.info { "writer end" }
}
fun printMemUsage(prefix:String= ""){
val runtime = Runtime.getRuntime()
val usedMemInMB=(runtime.totalMemory() - runtime.freeMemory()) / 1048576L
val maxHeapSizeInMB=runtime.maxMemory() / 1048576L
val availHeapSizeInMB = maxHeapSizeInMB - usedMemInMB
logger.info { "$prefix Used usedMemInMB: $usedMemInMB, maxHeapSizeInMB: $maxHeapSizeInMB, availHeapSizeInMB: $availHeapSizeInMB" }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment