Skip to content

Instantly share code, notes, and snippets.

@alpinegizmo
Last active January 14, 2019 11:31
Show Gist options
  • Save alpinegizmo/1d747055953f172699ab72763814255d to your computer and use it in GitHub Desktop.
Save alpinegizmo/1d747055953f172699ab72763814255d to your computer and use it in GitHub Desktop.
package com.dataartisans.flinktraining.examples.datastream_java.basics;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
public class ExampleRollingAggregates {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(EventTime);
String fileLocation = "/input/file/with/words/and/timestamps";
DataStreamSource<String> rawInput = env.readFile(new TextInputFormat(new Path(fileLocation)), fileLocation);
rawInput.flatMap(parse())
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<TimestampedWord>() {
@Override
public Watermark checkAndGetNextWatermark(TimestampedWord lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp - 1);
}
@Override
public long extractTimestamp(TimestampedWord element, long previousElementTimestamp) {
return element.getTimestamp();
}
})
.keyBy(TimestampedWord::getWord)
.window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
.process(new ProcessWindowFunction<TimestampedWord, Tuple3<String, Integer, Long>, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<TimestampedWord> iterable, Collector<Tuple3<String, Integer, Long>> collector) throws Exception {
int cnt = 0;
Iterator<TimestampedWord> i = iterable.iterator();
for (; i.hasNext(); i.next()) cnt++;
collector.collect(new Tuple3<>(key, cnt, context.window().maxTimestamp()));
}
})
.keyBy(t -> t.f0)
.process(new AggregateCounts())
.print();
env.execute();
}
public static class AggregateCounts extends KeyedProcessFunction<String, Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>> {
private transient ValueState<Integer> count;
@Override
public void open(Configuration parameters) throws Exception {
count = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Integer.class));
}
@Override
public void processElement(Tuple3<String, Integer, Long> t, Context context, Collector<Tuple3<String, Integer, Long>> out) throws Exception {
if (count.value() == null) {
count.update(0);
}
count.update(count.value() + t.f1);
out.collect(new Tuple3<>(t.f0, count.value(), t.f2));
}
}
private static FlatMapFunction<String, TimestampedWord> parse() {
return new FlatMapFunction<String, TimestampedWord>() {
@Override
public void flatMap(String value, Collector<TimestampedWord> out) {
String[] wordsAndTimes = value.split(" ");
out.collect(new TimestampedWord(wordsAndTimes[0], Long.parseLong(wordsAndTimes[1])));
}
};
}
private static class TimestampedWord {
private final String word;
private final long timestamp;
private TimestampedWord(String word, long timestamp) {
this.word = word;
this.timestamp = timestamp;
}
public String getWord() {
return word;
}
public long getTimestamp() {
return timestamp;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment