Created
February 27, 2020 10:57
-
-
Save manasIU/1777c9c99e195a409441815559094b49 to your computer and use it in GitHub Desktop.
Special trigger logic with broadcastState
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.functions.ReduceFunction; | |
import org.apache.flink.api.common.state.*; | |
import org.apache.flink.api.java.functions.KeySelector; | |
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; | |
import org.apache.flink.streaming.api.TimeCharacteristic; | |
import org.apache.flink.streaming.api.datastream.BroadcastStream; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; | |
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; | |
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; | |
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; | |
import org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows; | |
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; | |
import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.streaming.api.windowing.triggers.Trigger; | |
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; | |
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; | |
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; | |
import org.apache.flink.util.Collector; | |
import java.util.Properties; | |
public class test { | |
private static final long MIN_WINDOW_SIZE = 900L; | |
private static FlinkKafkaConsumer<ObjectNode> getFlinkKafakConsumer(String topic, boolean isEventTimeDriven) { | |
Properties properties = new Properties(); | |
properties.setProperty("bootstrap.servers", LOCAL_KAFKA); | |
FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer(topic, | |
new JSONKeyValueDeserializationSchema(false), properties); | |
if (isEventTimeDriven) { | |
consumer.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ObjectNode>() { | |
@Override | |
public long extractAscendingTimestamp(ObjectNode jsonNodes) { | |
return jsonNodes.at("/value/timestamp").asLong(); | |
} | |
}); | |
} | |
return consumer; | |
} | |
public static void main(String[] args) throws Exception { | |
MapStateDescriptor<String, String> dummyDesc = | |
new MapStateDescriptor<String, String>("dummy", | |
String.class, String.class); | |
// set up the streaming execution environment | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
FlinkKafkaConsumer<ObjectNode> configurationConsumer = getFlinkKafakConsumer("config-topic", false); | |
BroadcastStream<ObjectNode> configurationBroadcastStream = env.addSource(configurationConsumer).broadcast(dummyDesc); | |
FlinkKafkaConsumer<ObjectNode> eventConsumer = getFlinkKafakConsumer("input-topic", true); | |
DataStream<ObjectNode> elements = env.addSource(eventConsumer); | |
elements | |
.keyBy((KeySelector<ObjectNode, Integer>) element -> 0). | |
connect(configurationBroadcastStream). | |
process(new KeyedBroadcastProcessFunction<Integer, ObjectNode, ObjectNode, ObjectNode>() { | |
@Override | |
public void processElement(ObjectNode value, ReadOnlyContext ctx, Collector<ObjectNode> out) throws Exception { | |
out.collect(value); | |
} | |
@Override | |
public void processBroadcastElement(ObjectNode value, Context ctx, Collector<ObjectNode> out) throws Exception { | |
} | |
}). | |
keyBy((KeySelector<ObjectNode, Integer>) value -> 0). | |
window(EventTimeSessionWindows.withGap(Time.seconds(5L))) | |
.trigger(new ElementTimeWindowTrigger()) | |
.process(new MyWindowFunction()).setParallelism(1).print(); | |
// execute program | |
env.execute("Flink Streaming Java API Skeleton"); | |
} | |
public static class MyWindowFunction extends ProcessWindowFunction<ObjectNode, String, Integer, TimeWindow> { | |
private static final ValueStateDescriptor<Boolean> STATE_DESCRIPTOR = new ValueStateDescriptor<Boolean>("state", Boolean.class); | |
@Override | |
public void process(Integer integer, Context context, Iterable<ObjectNode> iterable, Collector<String> collector) throws Exception { | |
final ValueState<Boolean> valueState = getRuntimeContext().getState(STATE_DESCRIPTOR); | |
final TimeWindow timeWindow = context.window(); | |
final Boolean state = valueState.value(); | |
if (state == null || !state) { | |
collector.collect(String.format("Window started @ %s", timeWindow.getStart())); | |
valueState.update(true); | |
} else { | |
final StringBuilder builder = new StringBuilder(); | |
for (ObjectNode element : iterable) { | |
builder.append(element.get("value")).append("_"); | |
} | |
collector.collect(builder.toString()); | |
collector.collect(String.format("Window ended @ %s", timeWindow.getEnd())); | |
} | |
} | |
} | |
public static class Element { | |
private final long timestamp; | |
private final String value; | |
public Element(long timestamp, String value) { | |
this.timestamp = timestamp; | |
this.value = value; | |
} | |
public long getTimestamp() { | |
return timestamp; | |
} | |
public String getValue() { | |
return value; | |
} | |
public static Element from(String value, long timestamp) { | |
return new Element(timestamp, value); | |
} | |
} | |
enum State { | |
FIRST, | |
} | |
public static class ElementTimeWindowTrigger extends Trigger<ObjectNode, TimeWindow> { | |
private static final ReducingStateDescriptor<State> STATE_DESCRIPTOR = new ReducingStateDescriptor<State>("State", new ReduceFunction<State>() { | |
@Override | |
public State reduce(State state, State t1) throws Exception { | |
return state.compareTo(t1) > 0 ? state : t1; | |
} | |
}, State.class); | |
@Override | |
public TriggerResult onElement(ObjectNode element, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { | |
final ReducingState<State> valueState = triggerContext.getPartitionedState(STATE_DESCRIPTOR); | |
triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp()); | |
// System.out.println("Registered timer for " + timeWindow.maxTimestamp()); | |
if (l > timeWindow.getStart() + MIN_WINDOW_SIZE) { | |
State state = valueState.get(); | |
if (state == null) { | |
valueState.add(State.FIRST); | |
System.out.println("Triggering because first value was added to window. maxTimestamp :" + | |
timeWindow.maxTimestamp()); | |
return TriggerResult.FIRE; | |
} else { | |
return TriggerResult.CONTINUE; | |
} | |
} else{ | |
return TriggerResult.CONTINUE; | |
} | |
} | |
@Override | |
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { | |
if (l == timeWindow.maxTimestamp()) { | |
System.out.println("Triggering because l : " + l + " maxTimestamp : " + timeWindow.maxTimestamp() ); | |
return TriggerResult.FIRE; | |
} | |
else { | |
System.out.println("Continuing because l : " + l + " maxTimestamp : " + timeWindow.maxTimestamp() ); | |
return TriggerResult.CONTINUE; | |
} | |
// return l == timeWindow.maxTimestamp() ? | |
// TriggerResult.FIRE : | |
// TriggerResult.CONTINUE; | |
} | |
@Override | |
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { | |
triggerContext.getPartitionedState(STATE_DESCRIPTOR).clear(); | |
} | |
@Override | |
public boolean canMerge() { | |
return true; | |
} | |
@Override | |
public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception { | |
ctx.mergePartitionedState(STATE_DESCRIPTOR); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment