Skip to content

Instantly share code, notes, and snippets.

@jhaber
Created September 7, 2018 20:59
Show Gist options
  • Save jhaber/65eb9fc78413a419c3341b64db4fa2e9 to your computer and use it in GitHub Desktop.
Save jhaber/65eb9fc78413a419c3341b64db4fa2e9 to your computer and use it in GitHub Desktop.
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.grpc.ClientCall;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.Status;
public class StreamTimeoutClientCall<REQ, RES> extends SimpleForwardingClientCall<REQ, RES> {
private static final Logger LOG = LoggerFactory.getLogger(StreamTimeoutClientCall.class);
// add debounce so that we don't need to reset the timer after every message
// this should reduce timer overhead when there's a burst of messages
private static final Duration DEBOUNCE_INTERVAL = Duration.ofMillis(100);
private final Duration streamTimeout;
private final ScheduledExecutorService timeoutExecutor;
private final AtomicReference<ScheduledFuture<?>> streamTimeoutFuture;
private final AtomicLong lastTimerReset;
public StreamTimeoutClientCall(
ClientCall<REQ, RES> delegate,
Duration streamTimeout,
ScheduledExecutorService timeoutExecutor
) {
super(delegate);
this.streamTimeout = streamTimeout;
this.timeoutExecutor = timeoutExecutor;
this.streamTimeoutFuture = new AtomicReference<>();
// initialize this in such a way that the first call is outside the debounce window
this.lastTimerReset = new AtomicLong(System.nanoTime() - DEBOUNCE_INTERVAL.toNanos() - 1);
}
@Override
public void start(Listener<RES> responseListener, Metadata headers) {
resetStreamTimeout();
super.start(new SimpleForwardingClientCallListener<RES>(responseListener) {
@Override
public void onMessage(RES message) {
resetStreamTimeout();
super.onMessage(message);
}
@Override
public void onClose(Status status, Metadata trailers) {
cancelStreamTimeout();
super.onClose(status, trailers);
}
}, headers);
}
@Override
public void sendMessage(REQ message) {
resetStreamTimeout();
super.sendMessage(message);
}
private synchronized void resetStreamTimeout() {
if (debounce() && cancelStreamTimeout()) {
lastTimerReset.set(System.nanoTime());
streamTimeoutFuture.set(scheduleStreamTimeout());
}
}
private boolean debounce() {
long lastReset = lastTimerReset.get();
long now = System.nanoTime();
return (now - lastReset) > DEBOUNCE_INTERVAL.toNanos();
}
private boolean cancelStreamTimeout() {
ScheduledFuture<?> currentFuture = streamTimeoutFuture.get();
return currentFuture == null || currentFuture.cancel(false);
}
private ScheduledFuture<?> scheduleStreamTimeout() {
return timeoutExecutor.schedule(() -> {
try {
String message = String.format("RPC timed out after %sms of inactivity", streamTimeout.toMillis());
delegate().cancel(message, Status.DEADLINE_EXCEEDED.asException());
} catch (Throwable t) {
LOG.error("Error while attempting to cancel call", t);
throw t;
}
}, streamTimeout.plus(DEBOUNCE_INTERVAL).toNanos(), TimeUnit.NANOSECONDS);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment