Skip to content

Instantly share code, notes, and snippets.

@mustafaakin
Last active March 19, 2022 16:32
Show Gist options
  • Star 16 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save mustafaakin/457859b8bf703c64029071c1139b593d to your computer and use it in GitHub Desktop.
Save mustafaakin/457859b8bf703c64029071c1139b593d to your computer and use it in GitHub Desktop.
Flink Streaming SQL Example
objc[3232]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/bin/java (0x1008de4c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x1009a64e0). One of the two will be used. Which one is undefined.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1862072598] with leader session id 10afca54-e5f2-4b30-931f-b1df0275adfe.
07/02/2017 11:51:33 Job execution switched to status RUNNING.
07/02/2017 11:51:33 Source: Socket Stream -> Map -> Timestamps/Watermarks -> from: (room, temperature, creationDate, rowtime) -> select: (rowtime, room, temperature)(1/1) switched to SCHEDULED
07/02/2017 11:51:33 groupBy: (room), window: (TumblingGroupWindow('w$, 'rowtime, 10000.millis)), select: (room, AVG(temperature) AS avgTemp, start('w$) AS w$start, end('w$) AS w$end) -> select: (room, w$end, avgTemp) -> to: Row -> Sink: Unnamed(1/1) switched to SCHEDULED
07/02/2017 11:51:33 Source: Socket Stream -> Map -> Timestamps/Watermarks -> from: (room, temperature, creationDate, rowtime) -> select: (rowtime, room, temperature)(1/1) switched to DEPLOYING
07/02/2017 11:51:33 groupBy: (room), window: (TumblingGroupWindow('w$, 'rowtime, 10000.millis)), select: (room, AVG(temperature) AS avgTemp, start('w$) AS w$start, end('w$) AS w$end) -> select: (room, w$end, avgTemp) -> to: Row -> Sink: Unnamed(1/1) switched to DEPLOYING
07/02/2017 11:51:33 Source: Socket Stream -> Map -> Timestamps/Watermarks -> from: (room, temperature, creationDate, rowtime) -> select: (rowtime, room, temperature)(1/1) switched to RUNNING
07/02/2017 11:51:33 groupBy: (room), window: (TumblingGroupWindow('w$, 'rowtime, 10000.millis)), select: (room, AVG(temperature) AS avgTemp, start('w$) AS w$start, end('w$) AS w$end) -> select: (room, w$end, avgTemp) -> to: Row -> Sink: Unnamed(1/1) switched to RUNNING
Accepted connection
living room,2017-07-02 08:51:40.0,36.7043450149447
kitchen,2017-07-02 08:51:40.0,33.876180051046205
attic,2017-07-02 08:51:40.0,36.16675359462062
outside,2017-07-02 08:51:40.0,31.82492651162825
bedroom,2017-07-02 08:51:40.0,35.57564839912154
kitchen,2017-07-02 08:51:50.0,35.17374622822952
attic,2017-07-02 08:51:50.0,34.70059246571888
bedroom,2017-07-02 08:51:50.0,33.42090378358368
outside,2017-07-02 08:51:50.0,36.28734383828417
living room,2017-07-02 08:51:50.0,35.69019700021718
kitchen,2017-07-02 08:52:00.0,36.73052318978358
living room,2017-07-02 08:52:00.0,37.6541246537134
attic,2017-07-02 08:52:00.0,33.20183220572221
outside,2017-07-02 08:52:00.0,35.79210457006139
bedroom,2017-07-02 08:52:00.0,32.869989177833276
....
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>mustafaakin</groupId>
<artifactId>streaming-sql-test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>testCompile</id>
<phase>test-compile</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.sql.Time;
import java.util.Random;
/**
* Created by mustafa on 01/07/2017.
*/
public class SQLTester {
private static void listenAndGenerateNumbers(int port) {
try {
ServerSocket serverSocket = new ServerSocket(port);
Socket clientSocket = serverSocket.accept();
System.out.println("Accepted connection");
Random random = new Random();
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
String rooms[] = new String[]{"living room", "kitchen", "outside", "bedroom", "attic"};
for (int i = 0; i < 10000; i++) {
String room = rooms[random.nextInt(rooms.length)];
double temp = random.nextDouble() * 30 + 20;
out.println(room + "," + temp);
Thread.sleep(random.nextInt(10) + 50);
}
System.out.println("Closing server");
clientSocket.close();
serverSocket.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
private static final MapFunction<String, Tuple3<String, Double, Time>> mapFunction
= new MapFunction<String, Tuple3<String, Double, Time>>() {
@Override
public Tuple3<String, Double, Time> map(String s) throws Exception {
// data is: <roomname>,<temperature>
String p[] = s.split(",");
String room = p[0];
Double temperature = Double.parseDouble(p[1]);
Time creationDate = new Time(System.currentTimeMillis());
return new Tuple3<>(room, temperature, creationDate);
}
};
private final static AscendingTimestampExtractor extractor = new AscendingTimestampExtractor<Tuple3<String, Double, Time>>() {
@Override
public long extractAscendingTimestamp(Tuple3<String, Double, Time> element) {
return element.f2.getTime();
}
};
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final int port = 3901;
new Thread(() -> listenAndGenerateNumbers(port)).start();
Thread.sleep(1000); // wait the socket for a little;
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
DataStream<Tuple3<String, Double, Time>> dataset = text
.map(mapFunction)
.assignTimestampsAndWatermarks(extractor);
// Register it so we can use it in SQL
tableEnv.registerDataStream("sensors", dataset, "room, temperature, creationDate, rowtime.rowtime");
String query = "SELECT room, TUMBLE_END(rowtime, INTERVAL '10' SECOND), AVG(temperature) AS avgTemp FROM sensors GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), room";
Table table = tableEnv.sql(query);
// Just for printing purposes, in reality you would need something other than Row
tableEnv.toAppendStream(table, Row.class).print();
env.execute();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment