Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save nanmu42/53fe23c3f15c777291108a7f4df62b9a to your computer and use it in GitHub Desktop.
Save nanmu42/53fe23c3f15c777291108a7f4df62b9a to your computer and use it in GitHub Desktop.
Flink: throttle messages to walkaround deadlock under heavy iteration feedback load
package com.ybsjyyn.bdc.logistics.operators;
import org.apache.flink.api.common.functions.MapFunction;
/**
* Throttle messages
* <p>
* ref: https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java
*/
public class ThrottledMapper<T> implements MapFunction<T, T> {
private final long sleepBatchSize;
private final long sleepBatchTime;
private long lastBatchCheckTime;
private long num;
public ThrottledMapper(long elementsPerSecond) {
if (elementsPerSecond >= 100) {
// how many elements would we emit per 50ms
this.sleepBatchSize = elementsPerSecond / 20;
this.sleepBatchTime = 50;
} else if (elementsPerSecond >= 1) {
// how long does element take
this.sleepBatchSize = 1;
this.sleepBatchTime = 1000 / elementsPerSecond;
} else {
throw new IllegalArgumentException(
"'elements per second' must be positive and not zero");
}
this.lastBatchCheckTime = System.currentTimeMillis();
}
@Override
public T map(T event) throws Exception {
num++;
if (num < sleepBatchSize) {
lastBatchCheckTime = System.currentTimeMillis();
return event;
}
num = 0;
final long elapsed = System.currentTimeMillis() - lastBatchCheckTime;
if (elapsed < sleepBatchTime) {
try {
Thread.sleep(sleepBatchTime - elapsed);
} catch (InterruptedException e) {
// restore interrupt flag and proceed
Thread.currentThread().interrupt();
}
}
lastBatchCheckTime = System.currentTimeMillis();
return event;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment