Last active
January 26, 2022 10:00
-
-
Save isyufu/98ea46b94bd16a0b59db584734204d27 to your computer and use it in GitHub Desktop.
Read write parqet file in kotlin\java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dependencies { | |
... | |
implementation 'org.apache.parquet:parquet-avro:1.10.0' | |
implementation 'org.apache.hadoop:hadoop-common:3.1.0' | |
implementation 'io.github.serpro69:kotlin-faker:1.9.0' | |
... | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.my.helpers | |
import io.github.serpro69.kfaker.faker | |
import mu.KLogging | |
import org.apache.avro.SchemaBuilder | |
import org.apache.avro.generic.GenericData | |
import org.apache.avro.generic.GenericRecord | |
import org.apache.avro.reflect.ReflectData | |
import org.apache.parquet.avro.AvroParquetWriter | |
import org.apache.parquet.hadoop.ParquetFileReader | |
import org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE | |
import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP | |
import org.apache.parquet.hadoop.util.HadoopInputFile | |
import kotlin.random.Random | |
import org.apache.hadoop.conf.Configuration as HConf | |
import org.apache.hadoop.fs.Path as HfsPath | |
object ParquetUtils : KLogging() { | |
fun readParquet(path: String): Unit { | |
val reader = ParquetFileReader.open(HadoopInputFile.fromPath(HfsPath(path), HConf())) | |
val schema = reader.footer.fileMetaData.schema | |
logger.debug { "schema: $schema" } | |
val fields = schema.fields.map { it.name } | |
generateSequence { reader.readNextRowGroup() } | |
.takeWhile { it != null } | |
.map { pages -> | |
val columnIO = org.apache.parquet.io.ColumnIOFactory().getColumnIO(schema) | |
val recordReader = columnIO.getRecordReader(pages, org.apache.parquet.example.data.simple.convert.GroupRecordConverter(schema)) | |
(0 until pages.rowCount).forEach { _ -> | |
val simpleGroup = recordReader.read() as org.apache.parquet.example.data.simple.SimpleGroup | |
fields.forEach { field -> | |
val fieldIdx = simpleGroup.type.getFieldIndex(field) | |
(0 until simpleGroup.getFieldRepetitionCount(fieldIdx)).forEach { idx -> | |
val value = simpleGroup.getValueToString(fieldIdx, idx) | |
logger.info { "$field $value" } | |
} | |
} | |
} | |
}.also { System.gc() } | |
printMemUsage("readParquet close reader i: $i") | |
reader.close() | |
System.gc() | |
printMemUsage("readParquet after close reader and gc") | |
} | |
/** | |
* demo write parquet file | |
*/ | |
fun writeFakeParquet(path: String, totalColumns: Int = 100, totalRows: Int = 1000000, doubleColumns:Int = 20) { | |
val faker = faker { } | |
val rnd = Random(System.currentTimeMillis()) | |
val conf = HConf() | |
val dataFile = HfsPath(path) | |
val sch = SchemaBuilder.record("data").doc("random").namespace("xxx").fields().let { sbfa -> | |
var x = sbfa | |
(0 until totalColumns).forEach { i -> | |
x = x.name("field_$i").type().stringType().noDefault() | |
} | |
(0 until doubleColumns).forEach { i -> | |
x = x.name("field_${i}_d").type().doubleType().noDefault() | |
} | |
x | |
}.endRecord() | |
logger.info { "schema: " + sch } | |
logger.info { "writer start" } | |
AvroParquetWriter.builder<GenericRecord>(dataFile) | |
// .withSchema(ReflectData.AllowNull.get().getSchema(typeOf<MyCls>().javaType)) // generate nullable fields | |
.withSchema(sch) // generate nullable fields | |
.withDataModel(ReflectData.get()) | |
.withRowGroupSize(64 * 1024 * 1024)// default ~ 128 000 000 b, 1mb ~ 560 records | |
.withConf(conf) | |
.withCompressionCodec(GZIP) //allocated memory: 52510101 | |
// .withCompressionCodec(UNCOMPRESSED)//allocated memory: 81675633 56 mb | |
.withWriteMode(OVERWRITE) | |
.build().use { writer -> | |
(0 until totalRows).forEach { _ -> | |
writer.write( | |
GenericData.Record(sch).let { rec -> | |
(0 until totalColumns).forEach { i -> | |
when (i) { | |
in 0..9 -> rec.put("field_$i", faker.address.fullAddress()) | |
in 10..19 -> rec.put("field_$i", faker.bank.name()) | |
in 20..29 -> rec.put("field_$i", faker.animal.name()) | |
in 30..39 -> rec.put("field_$i", faker.beer.name()) | |
in 40..49 -> rec.put("field_$i", faker.blood.group()) | |
in 50..59 -> rec.put("field_$i", faker.book.title()) | |
in 60..69 -> rec.put("field_$i", faker.coin.flip()) | |
else -> rec.put("field_$i", faker.cat.name()) | |
} | |
} | |
(0 until doubleColumns).forEach { i -> | |
rec.put("field_${i}_d", rnd.nextDouble()) | |
} | |
rec | |
} | |
) | |
} | |
} | |
printMemUsage("writeFakeParquet before gc") | |
System.gc() | |
printMemUsage("writeFakeParquet after gc") | |
logger.info { "writer end" } | |
} | |
fun printMemUsage(prefix:String= ""){ | |
val runtime = Runtime.getRuntime() | |
val usedMemInMB=(runtime.totalMemory() - runtime.freeMemory()) / 1048576L | |
val maxHeapSizeInMB=runtime.maxMemory() / 1048576L | |
val availHeapSizeInMB = maxHeapSizeInMB - usedMemInMB | |
logger.info { "$prefix Used usedMemInMB: $usedMemInMB, maxHeapSizeInMB: $maxHeapSizeInMB, availHeapSizeInMB: $availHeapSizeInMB" } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment