Skip to content

Instantly share code, notes, and snippets.

@speeddragon
Last active January 16, 2020 13:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save speeddragon/6a98805d7f4aacff729f3d60b6a57ff8 to your computer and use it in GitHub Desktop.
Save speeddragon/6a98805d7f4aacff729f3d60b6a57ff8 to your computer and use it in GitHub Desktop.
Custom File Sink for Parquet( List[GenericRecord] )
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...
val source = new FlinkKafkaConsumer(settings.kafkaTopic(), new AugmentedMessageDeserializer, kafkaProperties)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[GenericRecord] {
def extractAscendingTimestamp(element: GenericRecord): Long =
Instant
.parse(element.get("timestamp").asInstanceOf[String])
.toEpochMilli()
})
val writer = WindowParquetGenericRecordListFileSink(settings.s3Path(), GenericRecordSchema.schema.toString())
...
val backend = new RocksDBStateBackend("file:///tmp/rocksdb-checkpoint", true);
env
.setStateBackend(backend)
.enableCheckpointing(settings.checkpointInterval())
.addSource(source)
.keyBy((record: GenericRecord) =>
record.get("key").asInstanceOf[String]
)
.timeWindow(Time.seconds(settings.windowTime()))
.allowedLateness(Time.seconds(settings.allowedLateness()))
.trigger(new DelayEventTimeTrigger())
.apply(new GenericRecordAggregatorWindowFunction())
.addSink(writer)
env.execute()
// DelayEventTimeTrigger
/**
* Copy from EventTimeTrigger
*/
class DelayEventTimeTrigger extends Trigger[Object, TimeWindow] {
val valueStateDescriptor = new ValueStateDescriptor[Boolean]("flag", classOf[Boolean])
override def onElement(
element: Object,
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext
): TriggerResult = {
val flag = ctx.getPartitionedState(valueStateDescriptor).value()
// Flag only used to register one trigger per window. Flag is cleaned when FIRE action is executed.
if (!flag) {
val delay = window.getEnd - window.getStart
ctx.getPartitionedState(valueStateDescriptor).update(true)
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime + delay)
ctx.registerEventTimeTimer(window.maxTimestamp())
}
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.FIRE
}
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
if (time == window.maxTimestamp()) {
TriggerResult.FIRE
} else {
TriggerResult.CONTINUE
}
}
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
ctx.deleteEventTimeTimer(window.maxTimestamp)
}
}
// GenericRecordAggregatorWindowFunction
class GenericRecordAggregatorWindowFunction
extends WindowFunction[GenericRecord, Iterable[GenericRecord], String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: lang.Iterable[GenericRecord],
out: Collector[Iterable[GenericRecord]]): Unit = {
out.collect(input.asScala)
}
}
// WindowParquetGenericRecordListFileSink
case class WindowParquetGenericRecordListFileSink(filePath: String, schema: String)
extends ParquetGenericRecordListFileSink[GenericRecord] {
def getBucketId(element: GenericRecord): String =
"account_id=" + element.get(KEY.name).asInstanceOf[String] +
"/partition_date=" + formatDateString(element.get(TIMESTAMP.name).asInstanceOf[String])
private def formatDateString(date: String) =
new SimpleDateFormat("yyyyMM").format(
new SimpleDateFormat("yyyy-MM-dd").parse(date))
def getFileName(genericRecord: GenericRecord): String = {
genericRecord.get("logger_timestamp").asInstanceOf[String]
.replace(" ", "_")
.replace("/", "-")
.replace(":", "-")
.concat( ".parquet")
}
}
// ParquetGenericRecordListFileSink
trait ParquetGenericRecordListFileSink[IN] extends SinkFunction[Iterable[IN]] with LazyLogging {
def filePath: String
def schema: String
override def invoke(elements: Iterable[IN], context: Context[_]) {
val fileName = getFileName(elements.head)
val finalFilePath = s"${filePath}/${getBucketId(elements.head)}/${fileName}";
val writer = AvroParquetWriter
.builder[IN](new Path(finalFilePath))
.withSchema(new Schema.Parser().parse(schema))
.withDataModel(GenericData.get)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
elements.foreach(
(element) => writer.write(element)
)
writer.close()
logger.info(s"Writing to ${finalFilePath}")
}
def getBucketId(element: IN): String
def getFileName(genericRecord: IN): String
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment