Skip to content

Instantly share code, notes, and snippets.

@manasIU
Created February 27, 2020 10:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save manasIU/1777c9c99e195a409441815559094b49 to your computer and use it in GitHub Desktop.
Save manasIU/1777c9c99e195a409441815559094b49 to your computer and use it in GitHub Desktop.
Special trigger logic with broadcastState
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class test {
private static final long MIN_WINDOW_SIZE = 900L;
private static FlinkKafkaConsumer<ObjectNode> getFlinkKafakConsumer(String topic, boolean isEventTimeDriven) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", LOCAL_KAFKA);
FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer(topic,
new JSONKeyValueDeserializationSchema(false), properties);
if (isEventTimeDriven) {
consumer.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ObjectNode>() {
@Override
public long extractAscendingTimestamp(ObjectNode jsonNodes) {
return jsonNodes.at("/value/timestamp").asLong();
}
});
}
return consumer;
}
public static void main(String[] args) throws Exception {
MapStateDescriptor<String, String> dummyDesc =
new MapStateDescriptor<String, String>("dummy",
String.class, String.class);
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer<ObjectNode> configurationConsumer = getFlinkKafakConsumer("config-topic", false);
BroadcastStream<ObjectNode> configurationBroadcastStream = env.addSource(configurationConsumer).broadcast(dummyDesc);
FlinkKafkaConsumer<ObjectNode> eventConsumer = getFlinkKafakConsumer("input-topic", true);
DataStream<ObjectNode> elements = env.addSource(eventConsumer);
elements
.keyBy((KeySelector<ObjectNode, Integer>) element -> 0).
connect(configurationBroadcastStream).
process(new KeyedBroadcastProcessFunction<Integer, ObjectNode, ObjectNode, ObjectNode>() {
@Override
public void processElement(ObjectNode value, ReadOnlyContext ctx, Collector<ObjectNode> out) throws Exception {
out.collect(value);
}
@Override
public void processBroadcastElement(ObjectNode value, Context ctx, Collector<ObjectNode> out) throws Exception {
}
}).
keyBy((KeySelector<ObjectNode, Integer>) value -> 0).
window(EventTimeSessionWindows.withGap(Time.seconds(5L)))
.trigger(new ElementTimeWindowTrigger())
.process(new MyWindowFunction()).setParallelism(1).print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
public static class MyWindowFunction extends ProcessWindowFunction<ObjectNode, String, Integer, TimeWindow> {
private static final ValueStateDescriptor<Boolean> STATE_DESCRIPTOR = new ValueStateDescriptor<Boolean>("state", Boolean.class);
@Override
public void process(Integer integer, Context context, Iterable<ObjectNode> iterable, Collector<String> collector) throws Exception {
final ValueState<Boolean> valueState = getRuntimeContext().getState(STATE_DESCRIPTOR);
final TimeWindow timeWindow = context.window();
final Boolean state = valueState.value();
if (state == null || !state) {
collector.collect(String.format("Window started @ %s", timeWindow.getStart()));
valueState.update(true);
} else {
final StringBuilder builder = new StringBuilder();
for (ObjectNode element : iterable) {
builder.append(element.get("value")).append("_");
}
collector.collect(builder.toString());
collector.collect(String.format("Window ended @ %s", timeWindow.getEnd()));
}
}
}
public static class Element {
private final long timestamp;
private final String value;
public Element(long timestamp, String value) {
this.timestamp = timestamp;
this.value = value;
}
public long getTimestamp() {
return timestamp;
}
public String getValue() {
return value;
}
public static Element from(String value, long timestamp) {
return new Element(timestamp, value);
}
}
enum State {
FIRST,
}
public static class ElementTimeWindowTrigger extends Trigger<ObjectNode, TimeWindow> {
private static final ReducingStateDescriptor<State> STATE_DESCRIPTOR = new ReducingStateDescriptor<State>("State", new ReduceFunction<State>() {
@Override
public State reduce(State state, State t1) throws Exception {
return state.compareTo(t1) > 0 ? state : t1;
}
}, State.class);
@Override
public TriggerResult onElement(ObjectNode element, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
final ReducingState<State> valueState = triggerContext.getPartitionedState(STATE_DESCRIPTOR);
triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
// System.out.println("Registered timer for " + timeWindow.maxTimestamp());
if (l > timeWindow.getStart() + MIN_WINDOW_SIZE) {
State state = valueState.get();
if (state == null) {
valueState.add(State.FIRST);
System.out.println("Triggering because first value was added to window. maxTimestamp :" +
timeWindow.maxTimestamp());
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
} else{
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
if (l == timeWindow.maxTimestamp()) {
System.out.println("Triggering because l : " + l + " maxTimestamp : " + timeWindow.maxTimestamp() );
return TriggerResult.FIRE;
}
else {
System.out.println("Continuing because l : " + l + " maxTimestamp : " + timeWindow.maxTimestamp() );
return TriggerResult.CONTINUE;
}
// return l == timeWindow.maxTimestamp() ?
// TriggerResult.FIRE :
// TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
triggerContext.getPartitionedState(STATE_DESCRIPTOR).clear();
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(STATE_DESCRIPTOR);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment