Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tillrohrmann/dda90b8b0e67e379a8dfee967fbd9af1 to your computer and use it in GitHub Desktop.
Save tillrohrmann/dda90b8b0e67e379a8dfee967fbd9af1 to your computer and use it in GitHub Desktop.
package org.stsffap;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
public class StreamingJob {
private static final long MIN_WINDOW_SIZE = 900L;
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final DataStream<Element> elements = env.fromElements(
Element.from("1", 1000L),
Element.from("2", 2000L),
Element.from("3", 3000L),
Element.from("10", 10000L),
Element.from("11", 11000L),
Element.from("12", 12000L),
Element.from("20", 20000L)).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Element>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Element lastElement, long extractedTimestamp) {
return new Watermark(lastElement.timestamp);
}
@Override
public long extractTimestamp(Element element, long previousElementTimestamp) {
return element.getTimestamp();
}
});
elements
.keyBy(new KeySelector<Element, Integer>() {
@Override
public Integer getKey(Element element) throws Exception {
return 0;
}
})
.window(EventTimeSessionWindows.withGap(Time.seconds(5L)))
.trigger(new ElementTimeWindowTrigger())
.process(new MyWindowFunction()).setParallelism(1).print().setParallelism(1);
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
public static class MyWindowFunction extends ProcessWindowFunction<Element, String, Integer, TimeWindow> {
private static final MapStateDescriptor<Long, Boolean> STATE_DESCRIPTOR = new MapStateDescriptor<Long, Boolean>("state", Long.class, Boolean.class);
@Override
public void process(Integer integer, Context context, Iterable<Element> iterable, Collector<String> collector) throws Exception {
final MapState<Long, Boolean> mapState = context.globalState().getMapState(STATE_DESCRIPTOR);
final TimeWindow timeWindow = context.window();
final Boolean state = mapState.get(timeWindow.getStart());
if (state == null || !state) {
collector.collect(String.format("Window started @ %s", timeWindow.getStart()));
mapState.put(timeWindow.getStart(), true);
} else {
final StringBuilder builder = new StringBuilder();
for (Element element : iterable) {
builder.append(element.getValue()).append("_");
}
collector.collect(builder.toString());
collector.collect(String.format("Window ended @ %s", timeWindow.getEnd()));
}
}
}
public static class Element {
private final long timestamp;
private final String value;
public Element(long timestamp, String value) {
this.timestamp = timestamp;
this.value = value;
}
public long getTimestamp() {
return timestamp;
}
public String getValue() {
return value;
}
public static Element from(String value, long timestamp) {
return new Element(timestamp, value);
}
}
enum State {
FIRST,
}
public static class ElementTimeWindowTrigger extends Trigger<Element, TimeWindow> {
private static final ReducingStateDescriptor<State> STATE_DESCRIPTOR = new ReducingStateDescriptor<State>("State", new ReduceFunction<State>() {
@Override
public State reduce(State state, State t1) throws Exception {
return state.compareTo(t1) > 0 ? state : t1;
}
}, State.class);
@Override
public TriggerResult onElement(Element element, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
final ReducingState<State> valueState = triggerContext.getPartitionedState(STATE_DESCRIPTOR);
triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp() + 1);
if (l > timeWindow.getStart() + MIN_WINDOW_SIZE) {
State state = valueState.get();
if (state == null) {
valueState.add(State.FIRST);
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
} else{
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (l == timeWindow.maxTimestamp()) {
triggerContext.getPartitionedState(STATE_DESCRIPTOR).clear();
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(STATE_DESCRIPTOR);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment