Skip to content

Instantly share code, notes, and snippets.

@jentfoo
Created March 2, 2016 15:47
Show Gist options
  • Save jentfoo/ea7197dac948d7cf2317 to your computer and use it in GitHub Desktop.
Save jentfoo/ea7197dac948d7cf2317 to your computer and use it in GitHub Desktop.
Example non-blocking jetty parallism filter
package com.fullcontact.load.sandbox;
import java.io.IOException;
import java.util.concurrent.Executor;
import javax.servlet.AsyncContext;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.threadly.concurrent.future.ListenableFuture;
import org.threadly.concurrent.future.Watchdog;
import org.threadly.concurrent.limiter.KeyedExecutorLimiter;
@Slf4j
public abstract class DosParallismControlFilter implements Filter {
protected static final String RESUMED_ATTRIBUTE_KEY = "dosFilterResumed";
protected final KeyedExecutorLimiter rateLimiter;
protected final Watchdog watchdog;
protected DosParallismControlFilter(Executor executor, int parallismPerKeyLimit, int maximumDelayMillis) {
this.rateLimiter = new KeyedExecutorLimiter(executor, parallismPerKeyLimit);
watchdog = new Watchdog(maximumDelayMillis, false);
}
/**
* Get key to categorize request. Requests are only flow controlled in relation to the rates
* from other requests with a matching key (identified by hashCode and equals). If
* {@code null} is returned then no flow control will be applied.
*
* @param request Request to extract key from
* @return Object to represent request flow control, or {@code null} to not restrict request
*/
protected abstract Object getRequestKey(HttpServletRequest request);
@Override
public void doFilter(ServletRequest request,
ServletResponse response,
FilterChain chain) throws IOException, ServletException {
if (request.getAttribute(RESUMED_ATTRIBUTE_KEY) != null) {
log.info("Running resumed request: " + request.getAttribute(RESUMED_ATTRIBUTE_KEY));
// resuming a previously flow controlled request, no need to go through the logic again
chain.doFilter(request, response);
return;
}
HttpServletRequest httpRequest = (HttpServletRequest)request;
HttpServletResponse httpResponse = (HttpServletResponse)response;
Object requestKey = getRequestKey(httpRequest);
if (requestKey == null) {
// no key indicates that this request should not be flow controlled, allow passthrough
chain.doFilter(request, response);
return;
}
final AsyncContext asyncContext = request.startAsync();
ListenableFuture<?> f = rateLimiter.submit(requestKey, () -> {
asyncContext.getRequest().setAttribute(RESUMED_ATTRIBUTE_KEY, Boolean.TRUE);
asyncContext.dispatch();
});
// watch so we can timeout the request if we can't handle it in time
watchdog.watch(f);
}
@Override
public void destroy() {
// ignored
}
@Override
public void init(FilterConfig arg0) {
// ignored
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment