Skip to content

Instantly share code, notes, and snippets.

@dmikusa
Last active December 29, 2015 10:49
Show Gist options
  • Save dmikusa/7660005 to your computer and use it in GitHub Desktop.
Save dmikusa/7660005 to your computer and use it in GitHub Desktop.
package org.apache.catalina.nonblocking;
import java.io.IOException;
import java.nio.CharBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.catalina.core.StandardContext;
import org.apache.catalina.startup.BytesStreamer;
import org.apache.catalina.startup.Tomcat;
import org.apache.catalina.startup.TomcatBaseTest;
import org.apache.tomcat.util.buf.ByteChunk;
import org.junit.Assert;
import org.junit.Test;
public class TestEchoListener extends TomcatBaseTest {
public static final int BUFFER_SIZE = 8 * 1024;
private static final int TOTAL_SIZE = 5 * 1048576;
@Test
public void testEchoListener1() throws Exception {
Tomcat tomcat = getTomcatInstance();
// Must have a real docBase - just use temp
StandardContext ctx = (StandardContext) tomcat.addContext("",
System.getProperty("java.io.tmpdir"));
EchoAsync2Servlet servlet = new EchoAsync2Servlet();
String servletName = EchoAsync2Servlet.class.getName();
Tomcat.addServlet(ctx, servletName, servlet);
ctx.addServletMapping("/", servletName);
tomcat.start();
Map<String, List<String>> resHeaders = new HashMap<>();
int rc = postUrl(true, new StaticBytesStreamer(TOTAL_SIZE, 131072, 0), "http://localhost:" +
getPort() + "/", new ByteChunk(), resHeaders, null);
Assert.assertEquals(HttpServletResponse.SC_OK, rc);
}
private static class StaticBytesStreamer implements BytesStreamer {
private int available = 10;
private int total = 10;
private int chunkSize = 1;
private int sleep = 0;
private int sent = 0;
public StaticBytesStreamer(int total, int chunkSize, int sleep) {
this.total = total;
this.available = total;
this.chunkSize = chunkSize;
this.sleep = sleep;
}
public int getLength() {
return total;
}
public int available() {
return available;
}
public byte[] next() {
available -= chunkSize;
sent += chunkSize;
byte[] data = CharBuffer.allocate(chunkSize).toString().replace('\0', 'a').getBytes();
if (sleep > 0) {
try {
Thread.sleep(sleep * 1000);
} catch (InterruptedException ex) {
// ignore
}
}
System.out.println("Send [" + sent + "] with [" + available + "] data left to send");
return data;
}
}
@WebServlet(asyncSupported = true)
public class EchoAsync2Servlet extends HttpServlet {
private static final long serialVersionUID = -3038617032944575769L;
public static final int BUFFER_SIZE = 8 * 1024;
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
AsyncContext asyncContext = request.startAsync(request, response);
asyncContext.setTimeout(0);
Echoer echoer = new Echoer(asyncContext);
response.getOutputStream().setWriteListener(echoer);
request.getInputStream().setReadListener(echoer);
}
private class Echoer implements ReadListener, WriteListener {
private final byte[] buffer = new byte[BUFFER_SIZE];
private final AsyncContext asyncContext;
private final ServletInputStream input;
private final ServletOutputStream output;
private long totalRead = 0;
private Echoer(AsyncContext asyncContext) throws IOException {
this.asyncContext = asyncContext;
this.input = asyncContext.getRequest().getInputStream();
this.output = asyncContext.getResponse().getOutputStream();
}
@Override
public void onDataAvailable() throws IOException {
while (input.isReady()) {
if (output.isReady()) {
int read = input.read(buffer);
totalRead += read;
output.write(buffer, 0, read);
getServletContext().log("read and wrote [" + read + "] total [" + totalRead + " of " + TOTAL_SIZE + "]");
} else {
return;
}
}
}
@Override
public void onAllDataRead() throws IOException {
asyncContext.complete();
}
@Override
public void onWritePossible() throws IOException {
if (input.isFinished()) {
onAllDataRead();
}
else {
onDataAvailable();
}
}
@Override
public void onError(Throwable failure) {
failure.printStackTrace();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment