Skip to content

Instantly share code, notes, and snippets.

@speeddragon
Created January 10, 2020 02:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save speeddragon/ea19cb07569a52cd78fad8d4af8c9e68 to your computer and use it in GitHub Desktop.
Save speeddragon/ea19cb07569a52cd78fad8d4af8c9e68 to your computer and use it in GitHub Desktop.
StreamingFileSink with Custom Rolling Policy
case class Writer(pathString: String, schema: String) {
def build(): StreamingFileSink[GenericRecord] = {
val path = new Path(pathString)
val encoder = new ParquetEncoder();
StreamingFileSink
.forRowFormat(path, encoder)
.withRollingPolicy(new MyRollingPolicy(60000))
.withBucketAssigner(new MessageBucketAssigner)
.build
}
class MyRollingPolicy(milliseconds: Long) extends RollingPolicy[GenericRecord, String] {
var lastWindowEnd = Long.MinValue;
override def shouldRollOnCheckpoint(partFileState: PartFileInfo[String]): Boolean = false
override def shouldRollOnEvent(partFileState: PartFileInfo[String], element: GenericRecord): Boolean = {
val logger_timestamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(element.get("timestamp").asInstanceOf[String]).getTime()
// Calculate Window
val start = TimeWindow.getWindowStartWithOffset(logger_timestamp, 0, this.milliseconds);
val end = start + this.milliseconds
// Check if should break the file
val output = lastWindowEnd != end && lastWindowEnd != Long.MinValue
lastWindowEnd = end
output
}
override def shouldRollOnProcessingTime(partFileState: PartFileInfo[String], currentTime: Long): Boolean = false
}
class ParquetEncoder extends Encoder[GenericRecord] {
override def encode(element: GenericRecord, stream: OutputStream): Unit = {
// TODO: Write to Parquet
// TODO: Read from Parquet and write to OutputStream
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment