Skip to content

Instantly share code, notes, and snippets.

@Sam-Serpoosh
Last active August 12, 2019 04:03
Show Gist options
  • Save Sam-Serpoosh/194068bd4e9fea9958bfae1cf618597b to your computer and use it in GitHub Desktop.
Save Sam-Serpoosh/194068bd4e9fea9958bfae1cf618597b to your computer and use it in GitHub Desktop.
Testing TumblingEventTimeWindow of Flink
package com.uber.gairos.flink.operator.flink_operator;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
import lombok.ToString;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.junit.Before;
import org.junit.Test;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import static org.junit.Assert.assertEquals;
/**
* Expectations and explanations of these tests are laid out based
* on a conversation which you can find in the following StackOverflow
* question:
*
* - https://stackoverflow.com/questions/57121018/flink-windows-boundaries-watermark-event-timestamp-processing-time
*
* A few points about the layout of the events in the test examples'
* descriptions:
*
* - Only the EVENT TIME of the arriving events are laid out for brevity
* - The relative LEFT/RIGHT placement of the events along the X-axis
* indicates their ARRIVAL time with regard to WALL-CLOCK or
* PROCESSING TIME.
*
* Also, in the assertions you'll notice the outcome/aggregated result
* of the LAST window is verified; EVEN THOUGH, technically the result
* of the LAST window should NOT be triggered due to the state of:
*
* - End Of Window (EOW)
* - Current Watermark (CW)
*
* I assume this happens due to Flink FLUSHING the remainder of the
* job's content/state/window once its execution is wrapped up in
* the test.
*/
public class TumblingWindowAggregationBehaviorTest {
private final static String AGG_ID = "item_aggregator";
private final static String CHI = "Chicago";
private final static Time WIN_SIZE = Time.minutes(5);
// ALL the following times belong to Wednesday July 24th 2019 UTC
private final static Long UTC_801_AM = 1563955260000L;
private final static Long UTC_802_AM = 1563955320000L;
private final static Long UTC_804_AM = 1563955440000L;
private final static Long UTC_806_AM = 1563955560000L;
private final static Long UTC_808_AM = 1563955680000L;
private StreamExecutionEnvironment execEnv;
private ExecutionConfig execCfg;
/**
* We invoke setAutoWatermarkInterval with the value ZERO
* cause we want the emission of a watermark after EVERY EVENT
* as opposed to the normal behavior which emits watermarks
* periodically when dealing with Out-Of-Orderness assigners.
*
* This is NOT efficient in production, but we use it for
* our testing purposes to ensure watermarks are being
* emitted after each event and the stream is advancing in
* the eyes of Flink.
*/
@Before
public void setupExecEnv() {
execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execCfg = execEnv.getConfig();
execCfg.setAutoWatermarkInterval(0L);
execEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
execEnv.setParallelism(1);
}
@Before
public void cleanUpCollector() {
CollectorSink.values.clear();
}
/**
* The WINDOW configuration:
*
* - TumblingEventTimeWindows => 5 MINUTES SIZE
* - Max Out Of Orderness => ZERO
* - Allowed Lateness => DEFAULT (ZERO)
*
* That configuration means:
*
* - CURRENT WATERMARK (CW) = max-event-time-seen-so-far
* - End Of Window (EOW)
*
* Events arrive as below:
*
* - 8:04 => [8:00, 8:05) - 8:02 => [8:00, 8:05) - 8:06 => [8:05, 8:10)
* - CW = 8:04 - CW = 8:04 - CW = 8:06
*
* When CW becomes 8:06 which is >= EOW for [8:00, 8:05), that
* window's result should be triggered/fired off AND the window
* itself should be discarded since:
*
* CW (8:06) >= EOW (8:04:59:999) + AllowedLateness (0)
*
* Finally, the overall windows' contents look like:
*
* - [8:00, 8:05): [item1, item2]
* - [8:05, 8:10): [item3]
*
*/
@Test
public void whenZEROOutOfOrdernessAndZEROAllowedLateness() throws Exception {
Item item1 = Item
.builder()
.ts(UTC_804_AM)
.price(10.0d)
.city(CHI)
.build();
Item item2 = Item
.builder()
.ts(UTC_802_AM)
.price(5.0d)
.city(CHI)
.build();
Item item3 = Item
.builder()
.ts(UTC_806_AM)
.price(20.0d)
.city(CHI)
.build();
DataStream<Item> items = execEnv
.fromElements(item1, item2, item3)
.assignTimestampsAndWatermarks(
new WatermarkAssigner<>(Time.seconds(0), Item::getTs)
);
DataStream<Item> agged = aggregateItems(items);
executeJob(agged);
assertEquals(2, CollectorSink.values.size());
Item window1Res = CollectorSink.values.get(0);
Item window2Res = CollectorSink.values.get(1);
assertEquals(15, window1Res.getPrice().intValue()); // item1 & item2
assertEquals(20, window2Res.getPrice().intValue()); // item 3
}
/**
* The WINDOW configuration:
*
* - TumblingEventTimeWindows => 5 MINUTES SIZE
* - Max Out Of Orderness => 2 MINUTES
* - Allowed Lateness => DEFAULT (ZERO)
*
* That configuration means:
*
* - CURRENT WATERMARK (CW) = max-event-time-seen-so-far - max-out-of-orderness (2 minutes)
* - End Of Window (EOW)
*
* Events arrive as below:
*
* - 8:04 => [8:00, 8:05) - 8:06 => [8:05, 8:10) - 8:01 => [8:00, 8:05) - 8:08 => [8:05, 8:10)
* - CW = 8:04 - 2 = 8:02 - CW = 8:06 - 2 = 8:04 - CW = 8:06 - 2 = 8:04 - CW = 8:08 - 2 = 8:06
*
*
* When CW becomes 8:06 which is >= EOW for [8:00, 8:05), that
* window's result should be triggered/fired off AND the window
* itself should be discarded since:
*
* CW (8:06) >= EOW (8:04:59:999) + AllowedLateness (0)
*
* Due to our MaxOutOfOrderness of 2 MINUTES, we're basically shifting
* back our CW by, well, 2 minutes.
*
* In the PREVIOUS test, the arrival of the event with event time of 8:06
* triggered the result of window [8:00, 8:05). BUT, in this test, you can
* see that that the arrival of the event with event time of 8:08 triggered
* the result of that SAME window [8:00, 8:05).
*
* Finally, the overall windows' contents look like:
*
* - [8:00, 8:05): [item1, item3]
* - [8:05, 8:10): [item2, item4]
*/
@Test
public void when2MinutesOutOfOrdernessAndZEROAllowedLateness() throws Exception {
Item item1 = Item
.builder()
.ts(UTC_804_AM)
.price(10.0d)
.city(CHI)
.build();
Item item2 = Item
.builder()
.ts(UTC_806_AM)
.price(20.0d)
.city(CHI)
.build();
Item item3 = Item
.builder()
.ts(UTC_801_AM)
.price(13.0d)
.city(CHI)
.build();
Item item4 = Item
.builder()
.ts(UTC_808_AM)
.price(40.0d)
.city(CHI)
.build();
DataStream<Item> items = execEnv
.fromElements(item1, item2, item3, item4)
.assignTimestampsAndWatermarks(
new WatermarkAssigner<>(Time.minutes(2), Item::getTs)
);
DataStream<Item> agged = aggregateItems(items);
executeJob(agged);
assertEquals(2, CollectorSink.values.size());
Item window1Res = CollectorSink.values.get(0);
Item window2Res = CollectorSink.values.get(1);
assertEquals(23, window1Res.getPrice().intValue()); // item1 & item3
assertEquals(60, window2Res.getPrice().intValue()); // item2 & item4
}
/**
* The WINDOW configuration:
*
* - TumblingEventTimeWindows => 5 MINUTES SIZE
* - Max Out Of Orderness => ZERO
* - Allowed Lateness => 2 MINUTES
*
* That configuration means:
*
* - CURRENT WATERMARK (CW) = max-event-time-seen-so-far
* - End Of Window (EOW)
*
* Events arrive as below:
*
* - 8:04 => [8:00, 8:05) - 8:06 => [8:05, 8:10) - 8:01 => [8:00, 8:05) - 8:02 => [8:00, 8:05) - 8:08 => [8:05, 8:10)
* - CW = 8:04 - CW = 8:06 - CW = 8:06 - CW = 8:06 - CW = 8:08
*
*
* When CW becomes 8:06 as the result of the arrival of the event
* with 8:06 event time, which is >= EOW for [8:00, 8:05), that
* window's result SHOULD BE triggered.
*
* But, since we have set an Allowed Lateness of 2 MINUTES, this window
* SHOULD BE KEPT and NOT discarded yet. While the EOW < CW < (EOW + AL),
* ANY EVENT which arrives and belongs to [8:00, 8:05), SHOULD cause an
* updated result for THAT window to be fired.
*
* This means the windows' states will be like the following:
*
* - [8:00, 8:05): [item1] => result1
* - [8:05, 8:10): [item2]
* - [8:00, 8:05): [item1, item3] => result2
* - [8:00, 8:05): [item1, item3, item4] => result3
* - [8:05, 8:10): [item2, item5] => result4
*
* When events with EVENT_TIME of 8:08 arrives, it'll push
* the CW to become 8:08. At which point we'll have:
*
* CW (8:08) >= EOW (8:04:59:999) + AllowedLateness (2)
*
* Hence, the window [8:00, 8:05) will be discarded. Any event
* arriving AFTER this point and belonging to [8:00, 8:05) will
* be considered TOO LATE and discarded.
*
*/
@Test
public void whenZEROOutOfOrdernessAnd2MinutesAllowedLateness() throws Exception {
Item item1 = Item
.builder()
.ts(UTC_804_AM)
.price(10.0d)
.city(CHI)
.build();
Item item2 = Item
.builder()
.ts(UTC_806_AM)
.price(20.0d)
.city(CHI)
.build();
Item item3 = Item
.builder()
.ts(UTC_801_AM)
.price(13.0d)
.city(CHI)
.build();
Item item4 = Item
.builder()
.ts(UTC_802_AM)
.price(7.0d)
.city(CHI)
.build();
Item item5 = Item
.builder()
.ts(UTC_808_AM)
.price(40.0d)
.city(CHI)
.build();
DataStream<Item> items = execEnv
.fromElements(item1, item2, item3, item4, item5)
.assignTimestampsAndWatermarks(
new WatermarkAssigner<>(Time.seconds(0), Item::getTs)
);
DataStream<Item> agged = aggregateItemsWithAllowedLateness(items, Time.minutes(2));
executeJob(agged);
/*
* Unfortunately the following ASSERTIONS FAIL and we are
* NOT seeing the EXPECTED behavior laid out in the description
* of this test!!! ¯\_(ツ)_/¯
*/
/*
assertEquals(4, CollectorSink.values.size());
Item window1Res = CollectorSink.values.get(0);
Item window2Res = CollectorSink.values.get(1);
Item window3Res = CollectorSink.values.get(2);
Item window4Res = CollectorSink.values.get(3);
assertEquals(10, window1Res.getPrice().intValue()); // item1
assertEquals(23, window2Res.getPrice().intValue()); // item1 & item3
assertEquals(30, window3Res.getPrice().intValue()); // item1, item3 & item4
assertEquals(60, window4Res.getPrice().intValue()); // item2 & item5
*/
/*
* Instead, we're getting the behavior shown by the
* following ASSERTIONS. Which again, is a violation
* of our expectation and understanding wiht regard to
* how Allowed-Lateness behaves. Unless that understanding
* is FLAWED!!! ¯\_(ツ)_/¯
*/
assertEquals(2, CollectorSink.values.size());
Item window1Res = CollectorSink.values.get(0);
Item window2Res = CollectorSink.values.get(1);
assertEquals(30, window1Res.getPrice().intValue()); // item1, item3 & item4
assertEquals(60, window2Res.getPrice().intValue()); // item2 & item5
}
// Naturally the next test case would be something like:
//
// - Tumbling Window : 5 minutes size
// - Max Out Of Orderness: 2 minutes
// - Allowed Lateness : 1 minute
//
// That one is left as an exercise for whoever is interested ;)
private DataStream<Item> aggregateItems(DataStream<Item> items) {
return items
.keyBy((KeySelector<Item, String>) Item::getCity)
.window(TumblingEventTimeWindows.of(WIN_SIZE))
.aggregate(new ItemAggregator())
.uid(AGG_ID);
}
private DataStream<Item> aggregateItemsWithAllowedLateness(DataStream<Item> items, Time lateness) {
return items
.keyBy((KeySelector<Item, String>) Item::getCity)
.window(TumblingEventTimeWindows.of(WIN_SIZE))
.allowedLateness(lateness)
.aggregate(new ItemAggregator())
.uid(AGG_ID);
}
/**
* @param opOutput The output of an operator can be captured via this
* Collector Sink and evaluated for verification purposes.
*/
protected void executeJob(DataStream<Item> opOutput) throws Exception {
opOutput.addSink(new CollectorSink());
execEnv.execute();
}
protected static class CollectorSink implements SinkFunction<Item> {
public static final List<Item> values = new ArrayList<>();
@Override
public synchronized void invoke(Item item) {
values.add(item);
}
}
}
class ItemAggregator implements AggregateFunction<Item, Item, Item> {
@Override
public Item createAccumulator() {
return Item
.builder()
.ts(null)
.price(0.0d)
.build();
}
@Override
public Item add(Item item, Item accumulator) {
// For the FIRST time, BRAND NEW accumulator
// EACH WINDOW will have its OWN accumulator
if (accumulator.getTs() == null) {
accumulator.setTs(item.getTs());
accumulator.setCity(item.getCity());
} else if (accumulator.getTs() < item.getTs()) {
accumulator.setTs(item.getTs());
}
accumulator.addUp(item);
return accumulator;
}
@Override
public Item getResult(Item accumulator) {
return accumulator;
}
@Override
public Item merge(Item item, Item accumulator) {
Item merged = Item
.builder()
.city(accumulator.getCity())
.price(accumulator.getPrice())
.ts(accumulator.getTs())
.build();
merged.addUp(item);
return merged;
}
}
class WatermarkAssigner<T> extends BoundedOutOfOrdernessTimestampExtractor<T> {
private final SerializableFunc<T, Long> eventTimeExtractor;
WatermarkAssigner(
Time maxOutOfOrderness,
final SerializableFunc<T, Long> eventTimeExtractor
) {
super(maxOutOfOrderness);
this.eventTimeExtractor = eventTimeExtractor;
}
@Override
public long extractTimestamp(T event) {
return eventTimeExtractor.apply(event);
}
}
@Builder
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
class Item {
private Long ts;
private Double price;
private String city;
void addUp(Item other) {
this.price += other.price;
}
}
interface SerializableFunc<IN, OUT> extends Function<IN, OUT>, Serializable {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment