Last active
February 25, 2019 19:12
-
-
Save staticor/bdca5a5281de3afd3e0e84876a4c7b64 to your computer and use it in GitHub Desktop.
first Flink application , used to wordcount
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package myflink; | |
import org.apache.flink.api.common.functions.FlatMapFunction; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.util.Collector; | |
public class SocketWindowWordCount { | |
public static void main(String[] args) throws Exception { | |
// 创建 execution environment | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
// 通过连接 socket 获取输入数据,这里连接到本地9000端口,如果9000端口已被占用,请换一个端口 | |
DataStream<String> text = env.socketTextStream("localhost", 9000, "\n"); | |
// 解析数据,按 word 分组,开窗,聚合 | |
DataStream<Tuple2<String, Integer>> windowCounts = text | |
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { | |
@Override | |
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { | |
for (String word : value.split("\\s")) { | |
out.collect(Tuple2.of(word, 1)); | |
} | |
} | |
}) | |
.keyBy(0) | |
.timeWindow(Time.seconds(5)) | |
.sum(1); | |
// 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程 | |
windowCounts.print().setParallelism(1); | |
// 新打开 Shell 执行: nc -lk 9000 | |
// 在Terminal 输入要统计的 文字流。 | |
env.execute("Socket Window WordCount"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package myflink; | |
import org.apache.flink.api.common.functions.FlatMapFunction; | |
import org.apache.flink.api.java.DataSet; | |
import org.apache.flink.api.java.ExecutionEnvironment; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.util.Collector; | |
public class WordCountExample { | |
public static void main(String[] args) throws Exception { | |
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | |
DataSet<String> text = env.fromElements( | |
"Who's there?", | |
"I think I hear them. Stand, ho! Who's there?"); | |
DataSet<Tuple2<String, Integer>> wordCounts = text | |
.flatMap(new LineSplitter()) | |
.groupBy(0) | |
.sum(1); | |
wordCounts.print(); | |
} | |
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { | |
@Override | |
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { | |
for (String word : line.split(" ")) { | |
out.collect(new Tuple2<String, Integer>(word, 1)); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment