Skip to content

Instantly share code, notes, and snippets.

Last active March 19, 2022 16:32
Show Gist options
  • Save mustafaakin/457859b8bf703c64029071c1139b593d to your computer and use it in GitHub Desktop.
Save mustafaakin/457859b8bf703c64029071c1139b593d to your computer and use it in GitHub Desktop.
Flink Streaming SQL Example
objc[3232]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/bin/java (0x1008de4c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x1009a64e0). One of the two will be used. Which one is undefined.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See for further details.
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1862072598] with leader session id 10afca54-e5f2-4b30-931f-b1df0275adfe.
07/02/2017 11:51:33 Job execution switched to status RUNNING.
07/02/2017 11:51:33 Source: Socket Stream -> Map -> Timestamps/Watermarks -> from: (room, temperature, creationDate, rowtime) -> select: (rowtime, room, temperature)(1/1) switched to SCHEDULED
07/02/2017 11:51:33 groupBy: (room), window: (TumblingGroupWindow('w$, 'rowtime, 10000.millis)), select: (room, AVG(temperature) AS avgTemp, start('w$) AS w$start, end('w$) AS w$end) -> select: (room, w$end, avgTemp) -> to: Row -> Sink: Unnamed(1/1) switched to SCHEDULED
07/02/2017 11:51:33 Source: Socket Stream -> Map -> Timestamps/Watermarks -> from: (room, temperature, creationDate, rowtime) -> select: (rowtime, room, temperature)(1/1) switched to DEPLOYING
07/02/2017 11:51:33 groupBy: (room), window: (TumblingGroupWindow('w$, 'rowtime, 10000.millis)), select: (room, AVG(temperature) AS avgTemp, start('w$) AS w$start, end('w$) AS w$end) -> select: (room, w$end, avgTemp) -> to: Row -> Sink: Unnamed(1/1) switched to DEPLOYING
07/02/2017 11:51:33 Source: Socket Stream -> Map -> Timestamps/Watermarks -> from: (room, temperature, creationDate, rowtime) -> select: (rowtime, room, temperature)(1/1) switched to RUNNING
07/02/2017 11:51:33 groupBy: (room), window: (TumblingGroupWindow('w$, 'rowtime, 10000.millis)), select: (room, AVG(temperature) AS avgTemp, start('w$) AS w$start, end('w$) AS w$end) -> select: (room, w$end, avgTemp) -> to: Row -> Sink: Unnamed(1/1) switched to RUNNING
Accepted connection
living room,2017-07-02 08:51:40.0,36.7043450149447
kitchen,2017-07-02 08:51:40.0,33.876180051046205
attic,2017-07-02 08:51:40.0,36.16675359462062
outside,2017-07-02 08:51:40.0,31.82492651162825
bedroom,2017-07-02 08:51:40.0,35.57564839912154
kitchen,2017-07-02 08:51:50.0,35.17374622822952
attic,2017-07-02 08:51:50.0,34.70059246571888
bedroom,2017-07-02 08:51:50.0,33.42090378358368
outside,2017-07-02 08:51:50.0,36.28734383828417
living room,2017-07-02 08:51:50.0,35.69019700021718
kitchen,2017-07-02 08:52:00.0,36.73052318978358
living room,2017-07-02 08:52:00.0,37.6541246537134
attic,2017-07-02 08:52:00.0,33.20183220572221
outside,2017-07-02 08:52:00.0,35.79210457006139
bedroom,2017-07-02 08:52:00.0,32.869989177833276
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=""
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import java.sql.Time;
import java.util.Random;
* Created by mustafa on 01/07/2017.
public class SQLTester {
private static void listenAndGenerateNumbers(int port) {
try {
ServerSocket serverSocket = new ServerSocket(port);
Socket clientSocket = serverSocket.accept();
System.out.println("Accepted connection");
Random random = new Random();
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
String rooms[] = new String[]{"living room", "kitchen", "outside", "bedroom", "attic"};
for (int i = 0; i < 10000; i++) {
String room = rooms[random.nextInt(rooms.length)];
double temp = random.nextDouble() * 30 + 20;
out.println(room + "," + temp);
Thread.sleep(random.nextInt(10) + 50);
System.out.println("Closing server");
} catch (Exception ex) {
private static final MapFunction<String, Tuple3<String, Double, Time>> mapFunction
= new MapFunction<String, Tuple3<String, Double, Time>>() {
public Tuple3<String, Double, Time> map(String s) throws Exception {
// data is: <roomname>,<temperature>
String p[] = s.split(",");
String room = p[0];
Double temperature = Double.parseDouble(p[1]);
Time creationDate = new Time(System.currentTimeMillis());
return new Tuple3<>(room, temperature, creationDate);
private final static AscendingTimestampExtractor extractor = new AscendingTimestampExtractor<Tuple3<String, Double, Time>>() {
public long extractAscendingTimestamp(Tuple3<String, Double, Time> element) {
return element.f2.getTime();
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
final int port = 3901;
new Thread(() -> listenAndGenerateNumbers(port)).start();
Thread.sleep(1000); // wait the socket for a little;
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
DataStream<Tuple3<String, Double, Time>> dataset = text
// Register it so we can use it in SQL
tableEnv.registerDataStream("sensors", dataset, "room, temperature, creationDate, rowtime.rowtime");
String query = "SELECT room, TUMBLE_END(rowtime, INTERVAL '10' SECOND), AVG(temperature) AS avgTemp FROM sensors GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), room";
Table table = tableEnv.sql(query);
// Just for printing purposes, in reality you would need something other than Row
tableEnv.toAppendStream(table, Row.class).print();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment