Skip to content

Instantly share code, notes, and snippets.

@domnikl
Created March 1, 2023 09:25
Show Gist options
  • Save domnikl/c36ed355deaf31268c3d0406c72bdf03 to your computer and use it in GitHub Desktop.
Save domnikl/c36ed355deaf31268c3d0406c72bdf03 to your computer and use it in GitHub Desktop.
Parquet Writer
class ParquetWriter<T : SpecificRecord>(private val outputFile: String, schema: Schema, conf: Configuration) {
private var closed = false
private val writer by lazy {
val timeSupport = GenericData().also {
it.addLogicalTypeConversion(TimeConversions.DateConversion())
}
AvroParquetWriter.builder<T>(HadoopOutputFile.fromPath(Path(outputFile), conf))
.withSchema(schema)
.withDataModel(timeSupport)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
}
fun write(records: Collection<T>) {
if (closed) throw IllegalStateException("The writer is closed and cannot be used to write anymore")
records.forEach {
writer.write(it)
}
}
fun close() {
writer.close()
closed = true
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment