Created
March 2, 2016 15:47
-
-
Save jentfoo/ea7197dac948d7cf2317 to your computer and use it in GitHub Desktop.
Example non-blocking jetty parallism filter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.fullcontact.load.sandbox; | |
import java.io.IOException; | |
import java.util.concurrent.Executor; | |
import javax.servlet.AsyncContext; | |
import javax.servlet.Filter; | |
import javax.servlet.FilterChain; | |
import javax.servlet.FilterConfig; | |
import javax.servlet.ServletException; | |
import javax.servlet.ServletRequest; | |
import javax.servlet.ServletResponse; | |
import javax.servlet.http.HttpServletRequest; | |
import javax.servlet.http.HttpServletResponse; | |
import lombok.extern.slf4j.Slf4j; | |
import org.threadly.concurrent.future.ListenableFuture; | |
import org.threadly.concurrent.future.Watchdog; | |
import org.threadly.concurrent.limiter.KeyedExecutorLimiter; | |
@Slf4j | |
public abstract class DosParallismControlFilter implements Filter { | |
protected static final String RESUMED_ATTRIBUTE_KEY = "dosFilterResumed"; | |
protected final KeyedExecutorLimiter rateLimiter; | |
protected final Watchdog watchdog; | |
protected DosParallismControlFilter(Executor executor, int parallismPerKeyLimit, int maximumDelayMillis) { | |
this.rateLimiter = new KeyedExecutorLimiter(executor, parallismPerKeyLimit); | |
watchdog = new Watchdog(maximumDelayMillis, false); | |
} | |
/** | |
* Get key to categorize request. Requests are only flow controlled in relation to the rates | |
* from other requests with a matching key (identified by hashCode and equals). If | |
* {@code null} is returned then no flow control will be applied. | |
* | |
* @param request Request to extract key from | |
* @return Object to represent request flow control, or {@code null} to not restrict request | |
*/ | |
protected abstract Object getRequestKey(HttpServletRequest request); | |
@Override | |
public void doFilter(ServletRequest request, | |
ServletResponse response, | |
FilterChain chain) throws IOException, ServletException { | |
if (request.getAttribute(RESUMED_ATTRIBUTE_KEY) != null) { | |
log.info("Running resumed request: " + request.getAttribute(RESUMED_ATTRIBUTE_KEY)); | |
// resuming a previously flow controlled request, no need to go through the logic again | |
chain.doFilter(request, response); | |
return; | |
} | |
HttpServletRequest httpRequest = (HttpServletRequest)request; | |
HttpServletResponse httpResponse = (HttpServletResponse)response; | |
Object requestKey = getRequestKey(httpRequest); | |
if (requestKey == null) { | |
// no key indicates that this request should not be flow controlled, allow passthrough | |
chain.doFilter(request, response); | |
return; | |
} | |
final AsyncContext asyncContext = request.startAsync(); | |
ListenableFuture<?> f = rateLimiter.submit(requestKey, () -> { | |
asyncContext.getRequest().setAttribute(RESUMED_ATTRIBUTE_KEY, Boolean.TRUE); | |
asyncContext.dispatch(); | |
}); | |
// watch so we can timeout the request if we can't handle it in time | |
watchdog.watch(f); | |
} | |
@Override | |
public void destroy() { | |
// ignored | |
} | |
@Override | |
public void init(FilterConfig arg0) { | |
// ignored | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment