Skip to content

Instantly share code, notes, and snippets.

@speeddragon
Created January 10, 2020 02:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save speeddragon/18fbd570557da59d7f6a2c5822cc7ad4 to your computer and use it in GitHub Desktop.
Save speeddragon/18fbd570557da59d7f6a2c5822cc7ad4 to your computer and use it in GitHub Desktop.
StreamingFileSink
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = new FlinkKafkaConsumer(settings.kafkaTopic(), new AugmentedMessageDeserializer, kafkaProperties)
val writer = Writer(settings.s3Path(), GenericRecordSchema.schema.toString()).build()
env
.enableCheckpointing(checkpointInterval)
.addSource(source)
.addSink(writer)
env.execute()
// Writer.scala
case class Writer(pathString: String, schema: String) {
def build(): StreamingFileSink[GenericRecord] = {
val path = new Path(pathString)
val builder: ParquetBuilder[GenericRecord] = (out: OutputFile) =>
createAvroParquetWriter(schema, GenericData.get, out)
val writer = new ParquetWriterFactory(builder)
StreamingFileSink
.forBulkFormat(path, writer)
.withBucketAssigner(new MessageBucketAssigner)
.build
}
@throws[IOException]
private def createAvroParquetWriter[T](schemaString: String, dataModel: GenericData, out: OutputFile) = {
val schema = new Schema.Parser().parse(schemaString)
AvroParquetWriter
.builder[T](out)
.withSchema(schema)
.withDataModel(dataModel)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build
}
}
@jainanuj07
Copy link

Hi Can u please help me with AugmentedMessageDeserializer and GenericRecordSchema class code.

@speeddragon
Copy link
Author

speeddragon commented Jan 25, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment