Created
March 26, 2020 15:52
-
-
Save tillrohrmann/dda90b8b0e67e379a8dfee967fbd9af1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.stsffap; | |
import org.apache.flink.api.common.functions.ReduceFunction; | |
import org.apache.flink.api.common.state.MapState; | |
import org.apache.flink.api.common.state.MapStateDescriptor; | |
import org.apache.flink.api.common.state.ReducingState; | |
import org.apache.flink.api.common.state.ReducingStateDescriptor; | |
import org.apache.flink.api.java.functions.KeySelector; | |
import org.apache.flink.streaming.api.TimeCharacteristic; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; | |
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; | |
import org.apache.flink.streaming.api.watermark.Watermark; | |
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.streaming.api.windowing.triggers.Trigger; | |
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; | |
import org.apache.flink.util.Collector; | |
import javax.annotation.Nullable; | |
public class StreamingJob { | |
private static final long MIN_WINDOW_SIZE = 900L; | |
public static void main(String[] args) throws Exception { | |
// set up the streaming execution environment | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
final DataStream<Element> elements = env.fromElements( | |
Element.from("1", 1000L), | |
Element.from("2", 2000L), | |
Element.from("3", 3000L), | |
Element.from("10", 10000L), | |
Element.from("11", 11000L), | |
Element.from("12", 12000L), | |
Element.from("20", 20000L)).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Element>() { | |
@Nullable | |
@Override | |
public Watermark checkAndGetNextWatermark(Element lastElement, long extractedTimestamp) { | |
return new Watermark(lastElement.timestamp); | |
} | |
@Override | |
public long extractTimestamp(Element element, long previousElementTimestamp) { | |
return element.getTimestamp(); | |
} | |
}); | |
elements | |
.keyBy(new KeySelector<Element, Integer>() { | |
@Override | |
public Integer getKey(Element element) throws Exception { | |
return 0; | |
} | |
}) | |
.window(EventTimeSessionWindows.withGap(Time.seconds(5L))) | |
.trigger(new ElementTimeWindowTrigger()) | |
.process(new MyWindowFunction()).setParallelism(1).print().setParallelism(1); | |
// execute program | |
env.execute("Flink Streaming Java API Skeleton"); | |
} | |
public static class MyWindowFunction extends ProcessWindowFunction<Element, String, Integer, TimeWindow> { | |
private static final MapStateDescriptor<Long, Boolean> STATE_DESCRIPTOR = new MapStateDescriptor<Long, Boolean>("state", Long.class, Boolean.class); | |
@Override | |
public void process(Integer integer, Context context, Iterable<Element> iterable, Collector<String> collector) throws Exception { | |
final MapState<Long, Boolean> mapState = context.globalState().getMapState(STATE_DESCRIPTOR); | |
final TimeWindow timeWindow = context.window(); | |
final Boolean state = mapState.get(timeWindow.getStart()); | |
if (state == null || !state) { | |
collector.collect(String.format("Window started @ %s", timeWindow.getStart())); | |
mapState.put(timeWindow.getStart(), true); | |
} else { | |
final StringBuilder builder = new StringBuilder(); | |
for (Element element : iterable) { | |
builder.append(element.getValue()).append("_"); | |
} | |
collector.collect(builder.toString()); | |
collector.collect(String.format("Window ended @ %s", timeWindow.getEnd())); | |
} | |
} | |
} | |
public static class Element { | |
private final long timestamp; | |
private final String value; | |
public Element(long timestamp, String value) { | |
this.timestamp = timestamp; | |
this.value = value; | |
} | |
public long getTimestamp() { | |
return timestamp; | |
} | |
public String getValue() { | |
return value; | |
} | |
public static Element from(String value, long timestamp) { | |
return new Element(timestamp, value); | |
} | |
} | |
enum State { | |
FIRST, | |
} | |
public static class ElementTimeWindowTrigger extends Trigger<Element, TimeWindow> { | |
private static final ReducingStateDescriptor<State> STATE_DESCRIPTOR = new ReducingStateDescriptor<State>("State", new ReduceFunction<State>() { | |
@Override | |
public State reduce(State state, State t1) throws Exception { | |
return state.compareTo(t1) > 0 ? state : t1; | |
} | |
}, State.class); | |
@Override | |
public TriggerResult onElement(Element element, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { | |
final ReducingState<State> valueState = triggerContext.getPartitionedState(STATE_DESCRIPTOR); | |
triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp() + 1); | |
if (l > timeWindow.getStart() + MIN_WINDOW_SIZE) { | |
State state = valueState.get(); | |
if (state == null) { | |
valueState.add(State.FIRST); | |
return TriggerResult.FIRE; | |
} else { | |
return TriggerResult.CONTINUE; | |
} | |
} else{ | |
return TriggerResult.CONTINUE; | |
} | |
} | |
@Override | |
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { | |
if (l == timeWindow.maxTimestamp()) { | |
triggerContext.getPartitionedState(STATE_DESCRIPTOR).clear(); | |
return TriggerResult.FIRE; | |
} else { | |
return TriggerResult.CONTINUE; | |
} | |
} | |
@Override | |
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { | |
} | |
@Override | |
public boolean canMerge() { | |
return true; | |
} | |
@Override | |
public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception { | |
ctx.mergePartitionedState(STATE_DESCRIPTOR); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment