Last active
September 1, 2020 17:18
-
-
Save skritch/a03880a0f8774efffbc5509afe17003c to your computer and use it in GitHub Desktop.
A partial implementation of a way to write a batch table to Parquet at a path in Flink, tested on 1.10
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.collection.JavaConverters._ | |
import scala.reflect.ClassTag | |
import scala.util.{Failure, Success, Try} | |
import org.apache.avro.generic.{GenericData, GenericRecord} | |
import org.apache.avro.{SchemaBuilder, Schema => AvroSchema} | |
import org.apache.flink.api.common.functions.RichMapFunction | |
import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => JTypes} | |
import org.apache.flink.api.java.operators.DataSink | |
import org.apache.flink.api.java.typeutils.RowTypeInfo | |
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat | |
import org.apache.flink.api.scala.typeutils.Types | |
import org.apache.flink.api.scala.{DataSet, _} | |
import org.apache.flink.configuration.Configuration | |
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo | |
import org.apache.flink.table.api.Table | |
import org.apache.flink.table.api.scala.BatchTableEnvironment | |
import org.apache.flink.types.Row | |
import org.apache.hadoop.mapreduce.Job | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat | |
import org.apache.parquet.avro.AvroParquetOutputFormat | |
import org.apache.parquet.hadoop.ParquetOutputFormat | |
import org.apache.parquet.hadoop.metadata.CompressionCodecName | |
object Avro { | |
type B = SchemaBuilder.FieldAssembler[AvroSchema] | |
def parse(s: String): AvroSchema = new AvroSchema.Parser().parse(s) | |
def typeInfoToField(fieldName: String, fieldType: TypeInformation[_], builder: B): B = { | |
val f = builder.name(fieldName).`type`().unionOf().nullType().and() | |
val t = fieldType match { | |
case Types.STRING => f.stringType() | |
case Types.INT | Types.SQL_DATE | Types.SQL_TIME => f.intType() | |
case Types.LONG | Types.SQL_TIMESTAMP => f.longType() | |
case Types.FLOAT => f.floatType() | |
case Types.DOUBLE => f.doubleType() | |
case Types.BOOLEAN => f.booleanType() | |
case Types.JAVA_BIG_DEC => f.doubleType() | |
// includes GenericTypeInfo[Option[_]]... make columns nullable :( | |
case ti => throw new IllegalArgumentException( | |
s"Unsupported TypeInformation $ti for field $fieldName" | |
) | |
} | |
Try(t.endUnion().nullDefault()) match { | |
case Success(finishedField) => finishedField | |
case Failure(e) => | |
// Note: will fail on non-aliased coalesce/udfs which are formatted like `expr$col` | |
throw new IllegalArgumentException( | |
s"Avro Schema creation failed for $fieldType for field $fieldName: ${e.getMessage}") | |
} | |
} | |
def fromTypeInfo(typeInfo: RowTypeInfo, name: String, nameSpace: String): AvroSchema = { | |
val inputFields = typeInfo.getFieldNames zip typeInfo.getFieldTypes | |
val initBuilder: B = SchemaBuilder | |
.record(name) | |
.namespace(nameSpace) | |
.fields() | |
// Copied & reversed, keeping only primitives, from: | |
// org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToTypeInfo | |
val withFields = inputFields.foldLeft(initBuilder) { case (builder: B, (fieldName, fieldType)) => | |
typeInfoToField(fieldName, fieldType, builder) | |
} | |
withFields.endRecord() | |
} | |
class ConvertRowToAvro(typeInfo: RowTypeInfo, name: String) extends RichMapFunction[Row, GenericRecord] { | |
val avroSchemaString: String = fromTypeInfo(typeInfo, name).toString | |
@transient lazy val avroSchema: AvroSchema = parse(avroSchemaString) | |
def map(row: Row): GenericRecord = { | |
val n = new GenericData.Record(avroSchema) | |
val fieldNames = avroSchema.getFields.asScala.map(_.name) | |
val fieldTypes = typeInfo.getFieldTypes | |
(fieldNames zip fieldTypes) | |
.zipWithIndex | |
.foreach { case ((fieldName, fieldType), i) => | |
val rawValue = Option(row.getField(i)) | |
val typedValue = rawValue.map { s => | |
fieldType match { | |
case Types.STRING => s.asInstanceOf[String] | |
case Types.INT => s.asInstanceOf[Int] | |
case Types.SQL_DATE => ??? | |
case Types.SQL_TIME => ??? | |
case Types.LONG => s.asInstanceOf[java.math.BigDecimal].longValue() | |
case Types.SQL_TIMESTAMP => ??? | |
case Types.FLOAT => s.asInstanceOf[Float] | |
case Types.DOUBLE => s.asInstanceOf[Double] | |
case Types.BOOLEAN => s.asInstanceOf[Boolean] | |
case Types.JAVA_BIG_DEC => s.asInstanceOf[java.math.BigDecimal].doubleValue() | |
case _: OptionTypeInfo[String, Option[String]] => s.asInstanceOf[Option[String]].orNull | |
} | |
} | |
n.put(fieldName, typedValue.orNull) | |
} | |
n | |
} | |
} | |
} | |
object Parquet { | |
def createOutputformat(schema: AvroSchema, path: String) | |
: HadoopOutputFormat[Void, GenericRecord] = { | |
val job = new Job() | |
val hadoopOutFormat = new HadoopOutputFormat[Void, GenericRecord]( | |
new AvroParquetOutputFormat(), | |
job | |
) | |
AvroParquetOutputFormat.setSchema(job, schema) | |
FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path)) | |
ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY) | |
ParquetOutputFormat.setEnableDictionary(job, true) // do we need this? | |
hadoopOutFormat | |
} | |
def writeTableParquet(table: Table, path: String, recordName: String, recordNameSpace: String)(implicit tableEnv: BatchTableEnvironment) | |
: DataSink[(Void, GenericRecord)] = { | |
val rowTypeInfo: RowTypeInfo = table.getSchema.toRowType.asInstanceOf[RowTypeInfo] | |
val avroConverter = new Avro.ConvertRowToAvro(rowTypeInfo, recordName) | |
val avroSchema = Avro.fromTypeInfo(rowTypeInfo, recordName, recordNameSpace) | |
val avroTypeInfo = new GenericRecordAvroTypeInfo(avroSchema) | |
implicit val hadoopTupleTypeInfo: TypeInformation[(Void, GenericRecord)] = | |
createTuple2TypeInformation(JTypes.VOID, avroTypeInfo) | |
implicit val hadoopTupleTag: ClassTag[(Void, GenericRecord)] = ClassTag(classOf[(Void, GenericRecord)]) | |
val outdata: DataSet[(Void, GenericRecord)] = tableEnv | |
.toDataSet[Row](table)(rowTypeInfo) | |
.map(avroConverter)(avroTypeInfo, ClassTag(classOf[GenericRecord])) | |
// Hadoop demands this type but we can't instantiate it directly | |
.map((null.asInstanceOf[Void], _)) // scalastyle:ignore null | |
val outputFormat = createOutputformat(avroSchema, path) | |
outdata.output(outputFormat) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Wrote this because we were on Flink 1.10 and we wanted to completely infer the datatype of the output from the existing
Table
without using a known AvroSchema. Flink 1.11's Parquet format doesn't seem to want to allow this, and while I'll admit it might be a bad idea, it was the easiest way to handle a schema that was derived from an external source without requiring the output schema be kept up to date with the source.There might be a way to wrap all this into one of Flink's interfaces, no clue.
Some SQL datatypes may not be handled in this example, and given the use of
GenericRecord
it might be slow.Requires these, roughly: