Skip to content

Instantly share code, notes, and snippets.

@manuzhang
Created February 24, 2017 02:16
Show Gist options
  • Save manuzhang/1fe3a12b6ae557d79d277eb34639e914 to your computer and use it in GitHub Desktop.
Save manuzhang/1fe3a12b6ae557d79d277eb34639e914 to your computer and use it in GitHub Desktop.
Patch of adding metrics to WindowOperator
Index: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java (date 1487603477000)
+++ flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java (revision )
@@ -38,6 +38,8 @@
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
@@ -69,8 +71,11 @@
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
+import java.util.Set;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -189,6 +194,18 @@
*/
private transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers;
+ private transient Set<Tuple2<W, K>> windows;
+
+ private transient Gauge windowCount;
+
+ private transient List<W> emittedWindows;
+
+ private transient Counter emittedWindowCount;
+
+ private transient Counter lateWindowCount;
+
+
+
/**
* Creates a new {@code WindowOperator} based on the given policies and user functions.
*/
@@ -300,6 +317,10 @@
}
registerRestoredLegacyStateState();
+ windows = new HashSet<>();
+ windowCount = getMetricGroup().gauge("Window Count", new WindowCountGauge<>(windows));
+ emittedWindowCount = getMetricGroup().counter("Emitted Window Count");
+ lateWindowCount = getMetricGroup().counter("Late Window Count");
}
@Override
@@ -354,8 +375,13 @@
}
});
+ if (actualWindow == window) {
+ windows.add(Tuple2.of(window, key));
+ }
+
// drop if the window is already late
if (isLate(actualWindow)) {
+ lateWindowCount.inc();
mergingWindows.retireWindow(actualWindow);
continue;
}
@@ -392,8 +418,10 @@
} else {
for (W window: elementWindows) {
+ windows.add(Tuple2.of(window, key));
// drop if the window is already late
if (isLate(window)) {
+ lateWindowCount.inc();
continue;
}
@@ -541,6 +569,7 @@
*/
@SuppressWarnings("unchecked")
private void emitWindowContents(W window, ACC contents) throws Exception {
+ emittedWindowCount.inc();
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
userFunction.apply(context.key, context.window, contents, timestampedCollector);
}
@@ -770,6 +799,20 @@
}
}
+ private static class WindowCountGauge<W> implements Gauge<Integer> {
+
+ private final Collection<W> windows;
+
+ public WindowCountGauge(Collection<W> windows) {
+ this.windows = windows;
+ }
+
+ @Override
+ public Integer getValue() {
+ return windows.size();
+ }
+ }
+
/**
* Internal class for keeping track of in-flight timers.
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment