Skip to content

Instantly share code, notes, and snippets.

@ldenson11
Created November 21, 2019 21:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ldenson11/1fb585fe3cef4959e41a43d95ec5b8db to your computer and use it in GitHub Desktop.
Save ldenson11/1fb585fe3cef4959e41a43d95ec5b8db to your computer and use it in GitHub Desktop.
tableEnv.connect(new Kafka()
.version("0.11")
.topic(params.getRequired("read-topic"))
.property("bootstrap.servers", params.getRequired("bootstrap.servers")))
.withSchema(new Schema()
.field("sensor", Types.STRING())
.field("temp", Types.LONG())
.field("ts", Types.SQL_TIMESTAMP())
.rowtime(new Rowtime()
.timestampsFromSource()
.watermarksPeriodicBounded(1000)
)
)
.withFormat(new Json().deriveSchema())
.inAppendMode()
.registerTableSource("sourceTopic");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment