Skip to content

Instantly share code, notes, and snippets.

@woj-i
Last active December 3, 2019 13:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save woj-i/b1dfbb71590b7f1c0c58be1f9e41c610 to your computer and use it in GitHub Desktop.
Save woj-i/b1dfbb71590b7f1c0c58be1f9e41c610 to your computer and use it in GitHub Desktop.
Reproduction of timestamp corruption in Flink Table API
public class PojoForTs {
private long ts;
public PojoForTs() {
}
public long getTs() {
return ts;
}
public void setTs(long ts) {
this.ts = ts;
}
}
import java.sql.Timestamp
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.scala._
object TsCorruption {
def main(args: Array[String]): Unit = {
val myPojo = new PojoForTs
myPojo.setTs(1000L)
val env = getEnv
val pojoStream: DataStream[PojoForTs] = env.fromElements(myPojo).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[PojoForTs] {
override def checkAndGetNextWatermark(lastElement: PojoForTs, extractedTimestamp: Long): Watermark = new Watermark(extractedTimestamp)
override def extractTimestamp(element: PojoForTs, previousElementTimestamp: Long): Long = element.getTs
})
val tableEnvironment = StreamTableEnvironment.create(env)
tableEnvironment.registerDataStream("corruptedTable", pojoStream, 'ts.rowtime)
val resStream = tableEnvironment.sqlQuery("SELECT * FROM corruptedTable").toAppendStream[Result]
resStream.print()
resStream.map(_.ts.getTime).print() // is -3599000 (on my PC at GMT+1 timezone), should be 1000 (independent of time zone)
env.execute("Timestamp Corruption Test")
}
case class Result(ts: Timestamp)
private def getEnv = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(200)
env
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment