Skip to content

Instantly share code, notes, and snippets.

@mustafaakin
Created July 2, 2017 09:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mustafaakin/b2f5a01c8515b5a0f89696cedc28ac03 to your computer and use it in GitHub Desktop.
Save mustafaakin/b2f5a01c8515b5a0f89696cedc28ac03 to your computer and use it in GitHub Desktop.
// A data stream, in reality it would be Kafka, Kinesis or other streams
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// Format the streaming data into a row format
DataStream<Tuple3<String, Double, Time>> dataset = text
.map(mapFunction)
.assignTimestampsAndWatermarks(extractor);
// Register it so we can refer it as 'sensors' in SQL
tableEnv.registerDataStream("sensors", dataset, "room, temperature, creationDate, rowtime.rowtime");
String query = "SELECT room, TUMBLE_END(rowtime, INTERVAL '10' SECOND), AVG(temperature) AS avgTemp FROM sensors GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), room";
Table table = tableEnv.sql(query);
// Just for printing purposes, in reality you would need something other than row
// a more formatted object, so that you can chain it through for any other purpose
tableEnv.toAppendStream(table, Row.class).print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment