Skip to content

Instantly share code, notes, and snippets.

@alpinegizmo
Last active February 13, 2020 14:15
Show Gist options
  • Save alpinegizmo/da80524fb97dcc95863c35a06e1d1280 to your computer and use it in GitHub Desktop.
Save alpinegizmo/da80524fb97dcc95863c35a06e1d1280 to your computer and use it in GitHub Desktop.
Simpler per-key triggering
/*
* Copyright 2019 Ververica GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.flinktraining.examples.datastream_java.windows;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class PerKeyTriggeringSimple {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> events = env.addSource(new AssetSource());
SingleOutputStreamOperator<Tuple4<Long, String, Long, String>> windowOperator = events
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
.keyBy(e -> e.assetId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.trigger(new AssetTrigger(20000))
.process(new ReportForAsset());
windowOperator.print();
env.execute();
}
private static class Event {
public final long timestamp;
public final long assetId;
final private long millisPerDay = (24 * 60 * 60 * 1000);
Event(long assetId) {
final long noiseForOutOfOrderness = new Random().nextInt(5000);
this.assetId = assetId;
this.timestamp = Instant.now().toEpochMilli() - assetId * millisPerDay - noiseForOutOfOrderness;
}
@Override
public String toString() {
return "Event{" + "assetId=" + assetId + ", @" + timestamp + '}';
}
}
private static class AssetSource implements SourceFunction<Event> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while(running) {
ctx.collect(new Event(new Random().nextInt(3)));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
private static class TimestampsAndWatermarks implements AssignerWithPunctuatedWatermarks<Event> {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Event lastElement, long extractedTimestamp) {
return null;
}
@Override
public long extractTimestamp(Event event, long previousElementTimestamp) {
return event.timestamp;
}
}
public static class AssetTrigger extends Trigger<Event, TimeWindow> {
private final ValueStateDescriptor<Boolean> timerStateDesc =
new ValueStateDescriptor<>("timer has been set", Types.BOOLEAN());
private final long interval;
public AssetTrigger(long interval) {
this.interval = interval;
}
@Override
public boolean canMerge() {
return false;
}
@Override
public TriggerResult onElement(
Event element,
long timestamp,
TimeWindow window,
TriggerContext ctx) throws Exception {
ValueState<Boolean> timer = ctx.getPartitionedState(timerStateDesc);
if (timer.value() == null) {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + interval);
timer.update(true);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(
long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(
long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(
TimeWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(timerStateDesc).clear();
}
}
private static class ReportForAsset extends ProcessWindowFunction<Event, Tuple4<Long, String, Long, String>, Long, TimeWindow> {
MapStateDescriptor<Long, Integer> stateDesc =
new MapStateDescriptor<>("sizes of previous windows", Types.LONG(), Types.INT());
@Override
public void process(
Long key,
Context context,
Iterable<Event> events,
Collector<Tuple4<Long, String, Long, String>> out) throws Exception {
MapState<Long, Integer> windowSizes = context.globalState().getMapState(stateDesc);
long windowStart = context.window().getStart();
List deltas = new ArrayList();
for (Event e : events) {
deltas.add(e.timestamp - windowStart);
}
windowSizes.put(windowStart, deltas.size());
List sizes = new ArrayList();
for (Map.Entry<Long, Integer> entry : windowSizes.entries()) {
sizes.add(entry.getValue());
}
out.collect(new Tuple4<>(key, sizes.toString(), windowStart, deltas.toString()));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment