Skip to content

Instantly share code, notes, and snippets.

@mxm
Last active October 26, 2016 17:11
Show Gist options
  • Save mxm/4ca8310b9d46bd1a7be47edc8ea75c22 to your computer and use it in GitHub Desktop.
Save mxm/4ca8310b9d46bd1a7be47edc8ea75c22 to your computer and use it in GitHub Desktop.
ZeroElementEmittingSlidingWindowOperator
package org.myorg.quickstart;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.*;
public class Tobias2 {
public static void main(String[] args) throws Exception {
final Time windowSize = Time.milliseconds(30);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final KeyedStream<Tuple2<Long, Long>, Tuple> tuple2TupleKeyedStream = env
.generateSequence(1, 1000)
.assignTimestampsAndWatermarks(new CustomAssigner())
.map(new MapFunction<Long, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(Long value) throws Exception {
return new Tuple2<>(1L, value);
}
})
.keyBy(0);
// The apply function which does the work per sliding window
ZeroElementEmittingSlidingWindowOperator.ApplyFunction<Tuple2<Long, Long>, Long> computationFn =
new ZeroElementEmittingSlidingWindowOperator.ApplyFunction<Tuple2<Long, Long>, Long>() {
@Override
public Long apply(Iterable<Tuple2<Long, Long>> elements) {
long count = 0;
for (Tuple2<Long, Long> elem : elements){
count += 1;
}
return count;
}
};
long zeroElement = 0;
final SingleOutputStreamOperator<Long> bla = tuple2TupleKeyedStream
.transform("ZeroElementEmittingSlidingWindowOperator", TypeExtractor.createTypeInfo(Long.class),
new ZeroElementEmittingSlidingWindowOperator<>(
windowSize,
tuple2TupleKeyedStream.getKeySelector(),
computationFn,
zeroElement));
bla.print();
env.execute();
}
/**
* Custom Watermark/Timestamp assigner for demoing purposes.
*/
private static class CustomAssigner implements AssignerWithPunctuatedWatermarks<Long> {
private long lastWaterMark = Long.MIN_VALUE;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
if (extractedTimestamp % 10 == 0) {
if (lastWaterMark < 200) {
lastWaterMark = lastElement;
} else {
// drop all remaining elements as late
lastWaterMark = Long.MAX_VALUE;
}
}
return new Watermark(lastWaterMark);
}
@Override
public long extractTimestamp(Long element, long previousElementTimestamp) {
return element;
}
}
/**
* Generic sliding window operator which fires on every element
* and emits a 0 element when no more data arrives.
*/
private static class ZeroElementEmittingSlidingWindowOperator<KEY, IN, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT> {
private final long windowSize;
private final KeySelector<IN, KEY> keySelector;
private long currentWatermark = Long.MIN_VALUE;
private final Map<KEY, TreeSet<StreamRecord<IN>>> state = new HashMap<>();
private final ApplyFunction<IN, OUT> userFunction;
private final OUT zeroElement;
private transient Comparator<StreamRecord<IN>> comparator;
private long lastTriggered = Long.MIN_VALUE;
public ZeroElementEmittingSlidingWindowOperator(
Time windowSize,
KeySelector<IN, KEY> keySelector,
ApplyFunction<IN, OUT> userFunction,
OUT zeroElement) {
this.windowSize = windowSize.toMilliseconds();
this.keySelector = keySelector;
this.userFunction = userFunction;
this.zeroElement = zeroElement;
}
@Override
public void open() throws Exception {
this.comparator = new Comparator<StreamRecord<IN>>() {
@Override
public int compare(StreamRecord<IN> elem1, StreamRecord<IN> elem2) {
long diff = elem1.getTimestamp() - elem2.getTimestamp();
if (diff > 0) {
return 1;
} else if (diff < 0) {
return -1;
} else {
return 0;
}
}
};
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// only compute on new elements, drop late elements
if (element.getTimestamp() + windowSize <= currentWatermark) {
// System.out.println("dropping late record: " + element);
return;
}
IN value = element.getValue();
KEY key = keySelector.getKey(value);
TreeSet<StreamRecord<IN>> currentQueue = state.get(key);
if (currentQueue == null) {
currentQueue = new TreeSet<>(comparator);
state.put(key, currentQueue);
}
currentQueue.add(element);
// trigger logic
if (lastTriggered != currentWatermark) {
long windowMin = currentWatermark - windowSize + 1;
long windowMax = currentWatermark;
System.out.println("triggering from " + windowMin + " to " + windowMax + " Watermark: " + currentWatermark);
OUT out = userFunction.apply(new TimestampFilteringIterable(currentQueue, windowMax));
this.output.collect(new StreamRecord<>(out, currentWatermark));
lastTriggered = currentWatermark;
}
}
@Override
public void processWatermark(Watermark mark) throws Exception {
if (mark.getTimestamp() == currentWatermark) {
// nothing changed
return;
}
currentWatermark = mark.getTimestamp();
// cleanup routine
Iterator<TreeSet<StreamRecord<IN>>> iterator = state.values().iterator();
while (iterator.hasNext()) {
TreeSet<StreamRecord<IN>> queue = iterator.next();
while (queue.size() > 0 && queue.first().getTimestamp() + windowSize <= currentWatermark) {
// remove old element
final StreamRecord<IN> poll = queue.pollFirst();
// System.out.println("Sliding window and removing: " + poll.getValue() + " because they are past the watermark: " + currentWatermark + " windowSize: " + windowSize);
}
if (queue.size() == 0) {
// cleanup queue
iterator.remove();
// send zero element
System.out.println("Sending out zero element for inactive user in timespan " + windowSize);
this.output.collect(new StreamRecord<>(zeroElement, currentWatermark));
}
}
output.emitWatermark(mark);
}
private class TimestampFilteringIterable implements Iterable<IN> {
private final Iterator<StreamRecord<IN>> queueIterator;
private final long maxTimestamp;
public TimestampFilteringIterable(TreeSet<StreamRecord<IN>> queue, long maxTimestamp) {
this.queueIterator = queue.iterator();
this.maxTimestamp = maxTimestamp;
}
@Override
public Iterator<IN> iterator() {
return new TimestampFilteringIterator();
}
/**
* Iterator which filters out entries which arrived are too far ahead for the maxTimestamp window timestamp.
*/
private class TimestampFilteringIterator implements Iterator<IN> {
private StreamRecord<IN> next;
@Override
public boolean hasNext() {
if (queueIterator.hasNext()) {
next = queueIterator.next();
if (next.getTimestamp() <= maxTimestamp) {
return true;
}
}
next = null;
return false;
}
@Override
public IN next() {
try {
if (next == null) {
throw new IllegalStateException("No next element available.");
}
return next.getValue();
} finally {
next = null;
}
}
}
}
/**
* Apply function which receives elements and performs an aggregation.
*/
private interface ApplyFunction<IN, OUT> extends Serializable {
OUT apply(Iterable<IN> elements);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment