Last active
January 14, 2019 11:31
-
-
Save alpinegizmo/1d747055953f172699ab72763814255d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.dataartisans.flinktraining.examples.datastream_java.basics; | |
import org.apache.flink.api.common.functions.FlatMapFunction; | |
import org.apache.flink.api.common.state.ValueState; | |
import org.apache.flink.api.common.state.ValueStateDescriptor; | |
import org.apache.flink.api.java.io.TextInputFormat; | |
import org.apache.flink.api.java.tuple.Tuple3; | |
import org.apache.flink.configuration.Configuration; | |
import org.apache.flink.core.fs.Path; | |
import org.apache.flink.streaming.api.datastream.DataStreamSource; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; | |
import org.apache.flink.streaming.api.functions.KeyedProcessFunction; | |
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; | |
import org.apache.flink.streaming.api.watermark.Watermark; | |
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; | |
import org.apache.flink.util.Collector; | |
import java.util.Iterator; | |
import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime; | |
public class ExampleRollingAggregates { | |
public static void main(String[] args) throws Exception { | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setStreamTimeCharacteristic(EventTime); | |
String fileLocation = "/input/file/with/words/and/timestamps"; | |
DataStreamSource<String> rawInput = env.readFile(new TextInputFormat(new Path(fileLocation)), fileLocation); | |
rawInput.flatMap(parse()) | |
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<TimestampedWord>() { | |
@Override | |
public Watermark checkAndGetNextWatermark(TimestampedWord lastElement, long extractedTimestamp) { | |
return new Watermark(extractedTimestamp - 1); | |
} | |
@Override | |
public long extractTimestamp(TimestampedWord element, long previousElementTimestamp) { | |
return element.getTimestamp(); | |
} | |
}) | |
.keyBy(TimestampedWord::getWord) | |
.window(TumblingEventTimeWindows.of(Time.milliseconds(10))) | |
.process(new ProcessWindowFunction<TimestampedWord, Tuple3<String, Integer, Long>, String, TimeWindow>() { | |
@Override | |
public void process(String key, Context context, Iterable<TimestampedWord> iterable, Collector<Tuple3<String, Integer, Long>> collector) throws Exception { | |
int cnt = 0; | |
Iterator<TimestampedWord> i = iterable.iterator(); | |
for (; i.hasNext(); i.next()) cnt++; | |
collector.collect(new Tuple3<>(key, cnt, context.window().maxTimestamp())); | |
} | |
}) | |
.keyBy(t -> t.f0) | |
.process(new AggregateCounts()) | |
.print(); | |
env.execute(); | |
} | |
public static class AggregateCounts extends KeyedProcessFunction<String, Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>> { | |
private transient ValueState<Integer> count; | |
@Override | |
public void open(Configuration parameters) throws Exception { | |
count = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Integer.class)); | |
} | |
@Override | |
public void processElement(Tuple3<String, Integer, Long> t, Context context, Collector<Tuple3<String, Integer, Long>> out) throws Exception { | |
if (count.value() == null) { | |
count.update(0); | |
} | |
count.update(count.value() + t.f1); | |
out.collect(new Tuple3<>(t.f0, count.value(), t.f2)); | |
} | |
} | |
private static FlatMapFunction<String, TimestampedWord> parse() { | |
return new FlatMapFunction<String, TimestampedWord>() { | |
@Override | |
public void flatMap(String value, Collector<TimestampedWord> out) { | |
String[] wordsAndTimes = value.split(" "); | |
out.collect(new TimestampedWord(wordsAndTimes[0], Long.parseLong(wordsAndTimes[1]))); | |
} | |
}; | |
} | |
private static class TimestampedWord { | |
private final String word; | |
private final long timestamp; | |
private TimestampedWord(String word, long timestamp) { | |
this.word = word; | |
this.timestamp = timestamp; | |
} | |
public String getWord() { | |
return word; | |
} | |
public long getTimestamp() { | |
return timestamp; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment