Last active
February 13, 2020 14:15
-
-
Save alpinegizmo/da80524fb97dcc95863c35a06e1d1280 to your computer and use it in GitHub Desktop.
Simpler per-key triggering
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2019 Ververica GmbH | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.ververica.flinktraining.examples.datastream_java.windows; | |
import org.apache.flink.api.common.state.MapState; | |
import org.apache.flink.api.common.state.MapStateDescriptor; | |
import org.apache.flink.api.common.state.ValueState; | |
import org.apache.flink.api.common.state.ValueStateDescriptor; | |
import org.apache.flink.api.java.tuple.Tuple4; | |
import org.apache.flink.api.scala.typeutils.Types; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; | |
import org.apache.flink.streaming.api.functions.source.SourceFunction; | |
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; | |
import org.apache.flink.streaming.api.watermark.Watermark; | |
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.streaming.api.windowing.triggers.Trigger; | |
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; | |
import org.apache.flink.util.Collector; | |
import javax.annotation.Nullable; | |
import java.time.Instant; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Random; | |
public class PerKeyTriggeringSimple { | |
public static void main(String[] args) throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
DataStream<Event> events = env.addSource(new AssetSource()); | |
SingleOutputStreamOperator<Tuple4<Long, String, Long, String>> windowOperator = events | |
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks()) | |
.keyBy(e -> e.assetId) | |
.window(TumblingEventTimeWindows.of(Time.seconds(10))) | |
.trigger(new AssetTrigger(20000)) | |
.process(new ReportForAsset()); | |
windowOperator.print(); | |
env.execute(); | |
} | |
private static class Event { | |
public final long timestamp; | |
public final long assetId; | |
final private long millisPerDay = (24 * 60 * 60 * 1000); | |
Event(long assetId) { | |
final long noiseForOutOfOrderness = new Random().nextInt(5000); | |
this.assetId = assetId; | |
this.timestamp = Instant.now().toEpochMilli() - assetId * millisPerDay - noiseForOutOfOrderness; | |
} | |
@Override | |
public String toString() { | |
return "Event{" + "assetId=" + assetId + ", @" + timestamp + '}'; | |
} | |
} | |
private static class AssetSource implements SourceFunction<Event> { | |
private volatile boolean running = true; | |
@Override | |
public void run(SourceContext<Event> ctx) throws Exception { | |
while(running) { | |
ctx.collect(new Event(new Random().nextInt(3))); | |
Thread.sleep(1000); | |
} | |
} | |
@Override | |
public void cancel() { | |
running = false; | |
} | |
} | |
private static class TimestampsAndWatermarks implements AssignerWithPunctuatedWatermarks<Event> { | |
@Nullable | |
@Override | |
public Watermark checkAndGetNextWatermark(Event lastElement, long extractedTimestamp) { | |
return null; | |
} | |
@Override | |
public long extractTimestamp(Event event, long previousElementTimestamp) { | |
return event.timestamp; | |
} | |
} | |
public static class AssetTrigger extends Trigger<Event, TimeWindow> { | |
private final ValueStateDescriptor<Boolean> timerStateDesc = | |
new ValueStateDescriptor<>("timer has been set", Types.BOOLEAN()); | |
private final long interval; | |
public AssetTrigger(long interval) { | |
this.interval = interval; | |
} | |
@Override | |
public boolean canMerge() { | |
return false; | |
} | |
@Override | |
public TriggerResult onElement( | |
Event element, | |
long timestamp, | |
TimeWindow window, | |
TriggerContext ctx) throws Exception { | |
ValueState<Boolean> timer = ctx.getPartitionedState(timerStateDesc); | |
if (timer.value() == null) { | |
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + interval); | |
timer.update(true); | |
} | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public TriggerResult onProcessingTime( | |
long time, TimeWindow window, TriggerContext ctx) throws Exception { | |
return TriggerResult.FIRE_AND_PURGE; | |
} | |
@Override | |
public TriggerResult onEventTime( | |
long time, TimeWindow window, TriggerContext ctx) throws Exception { | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public void clear( | |
TimeWindow window, TriggerContext ctx) throws Exception { | |
ctx.getPartitionedState(timerStateDesc).clear(); | |
} | |
} | |
private static class ReportForAsset extends ProcessWindowFunction<Event, Tuple4<Long, String, Long, String>, Long, TimeWindow> { | |
MapStateDescriptor<Long, Integer> stateDesc = | |
new MapStateDescriptor<>("sizes of previous windows", Types.LONG(), Types.INT()); | |
@Override | |
public void process( | |
Long key, | |
Context context, | |
Iterable<Event> events, | |
Collector<Tuple4<Long, String, Long, String>> out) throws Exception { | |
MapState<Long, Integer> windowSizes = context.globalState().getMapState(stateDesc); | |
long windowStart = context.window().getStart(); | |
List deltas = new ArrayList(); | |
for (Event e : events) { | |
deltas.add(e.timestamp - windowStart); | |
} | |
windowSizes.put(windowStart, deltas.size()); | |
List sizes = new ArrayList(); | |
for (Map.Entry<Long, Integer> entry : windowSizes.entries()) { | |
sizes.add(entry.getValue()); | |
} | |
out.collect(new Tuple4<>(key, sizes.toString(), windowStart, deltas.toString())); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment