Skip to content

Instantly share code, notes, and snippets.

@hotienvu
Created March 4, 2019 15:46
Show Gist options
  • Save hotienvu/eaf2a360ee4a8d0558ffb2ca8729d94b to your computer and use it in GitHub Desktop.
Save hotienvu/eaf2a360ee4a8d0558ffb2ca8729d94b to your computer and use it in GitHub Desktop.
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.{AvroParquetReader, AvroReadSupport, AvroSchemaConverter}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.{FilterApi, UserDefinedPredicate}
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.Types
import org.apache.parquet.schema.Types.MessageTypeBuilder
import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
object Main {
def main(args: Array[String]): Unit = {
val parquetReader = new VectorizedParquetRecordReader(true);
import scala.collection.JavaConverters._
parquetReader.initialize("people.parquet", List("name").asJava)
val batch = parquetReader.resultBatch()
var numBatches = 0
while (parquetReader.nextBatch()) {
numBatches += 1
println("no. rows", batch.numRows())
batch.rowIterator().asScala.foreach(it => println(it.getString(0)))
}
val path = new Path("people.parquet")
val metadata = ParquetFileReader.readFooter(new Configuration(), path, ParquetMetadataConverter.NO_FILTER).getFileMetaData
val parquetSchema = metadata.getSchema
val builder = Types.buildMessage()
List("age", "name").foreach(field => {
val path: Array[String] = field.split('.')
if (path.length > 0) {
val p = path(0)
val fieldType = parquetSchema.getType(List(p) :_*)
builder.addField(fieldType)
}
})
val projectedSchema = builder.named("Person")
val avroSchema = new AvroSchemaConverter().convert(projectedSchema)
println("num batches: ", numBatches)
println(avroSchema.toString(true))
val gt = FilterApi.gtEq(FilterApi.longColumn("age"), java.lang.Long.valueOf(10L))
val isNotnull = FilterApi.notEq(FilterApi.binaryColumn("name"), Binary.fromString("null"))
val filter = FilterApi.and(gt, isNotnull)
println(filter)
val conf = new Configuration()
conf.set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, avroSchema.toString)
println(conf.get("parquet.private.read.filter.predicate.human.readable"))
val reader = AvroParquetReader.builder(new Path("people.parquet"))
.withConf(conf)
.withFilter(FilterCompat.get(filter))
.build()
var record: GenericRecord = null
do {
record = reader.read()
println(record)
} while (record != null)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment