Last active
September 23, 2019 15:58
-
-
Save KKcorps/9f70a24334f1ef7f124b40f1d5080310 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); | |
ParameterTool configuration = ParameterTool.fromArgs(args); | |
FlinkKafkaConsumer010<String> kafkaConsumer010 = new FlinkKafkaConsumer010<String>("test", new SimpleStringSchema(), getKafkaConsumerProperties("testing123")); | |
DataStream<String> srcStream = env.addSource(kafkaConsumer010); | |
Random random = new Random(); | |
DataStream<String> outStream = srcStream | |
.map(row -> new KeyValue("testing" + random.nextInt(100000), row)) | |
.keyBy(row -> row.getKey()) | |
.process(new StatefulProcess()).name("stateful_process").uid("stateful_process") | |
.keyBy(row -> row.getKey()) | |
.flatMap(new StatefulMapTest()).name("stateful_map_test").uid("stateful_map_test"); | |
outStream.print(); | |
env.execute("Test Job"); | |
} | |
public static Properties getKafkaConsumerProperties(String groupId){ | |
Properties props = new Properties(); | |
props.setProperty("bootstrap.servers", "localhost:9092" | |
); | |
props.setProperty("group.id", groupId); | |
return props; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment