Last active
December 29, 2015 10:49
-
-
Save dmikusa/7660005 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.catalina.nonblocking; | |
import java.io.IOException; | |
import java.nio.CharBuffer; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import javax.servlet.AsyncContext; | |
import javax.servlet.ReadListener; | |
import javax.servlet.ServletException; | |
import javax.servlet.ServletInputStream; | |
import javax.servlet.ServletOutputStream; | |
import javax.servlet.WriteListener; | |
import javax.servlet.annotation.WebServlet; | |
import javax.servlet.http.HttpServlet; | |
import javax.servlet.http.HttpServletRequest; | |
import javax.servlet.http.HttpServletResponse; | |
import org.apache.catalina.core.StandardContext; | |
import org.apache.catalina.startup.BytesStreamer; | |
import org.apache.catalina.startup.Tomcat; | |
import org.apache.catalina.startup.TomcatBaseTest; | |
import org.apache.tomcat.util.buf.ByteChunk; | |
import org.junit.Assert; | |
import org.junit.Test; | |
public class TestEchoListener extends TomcatBaseTest { | |
public static final int BUFFER_SIZE = 8 * 1024; | |
private static final int TOTAL_SIZE = 5 * 1048576; | |
@Test | |
public void testEchoListener1() throws Exception { | |
Tomcat tomcat = getTomcatInstance(); | |
// Must have a real docBase - just use temp | |
StandardContext ctx = (StandardContext) tomcat.addContext("", | |
System.getProperty("java.io.tmpdir")); | |
EchoAsync2Servlet servlet = new EchoAsync2Servlet(); | |
String servletName = EchoAsync2Servlet.class.getName(); | |
Tomcat.addServlet(ctx, servletName, servlet); | |
ctx.addServletMapping("/", servletName); | |
tomcat.start(); | |
Map<String, List<String>> resHeaders = new HashMap<>(); | |
int rc = postUrl(true, new StaticBytesStreamer(TOTAL_SIZE, 131072, 0), "http://localhost:" + | |
getPort() + "/", new ByteChunk(), resHeaders, null); | |
Assert.assertEquals(HttpServletResponse.SC_OK, rc); | |
} | |
private static class StaticBytesStreamer implements BytesStreamer { | |
private int available = 10; | |
private int total = 10; | |
private int chunkSize = 1; | |
private int sleep = 0; | |
private int sent = 0; | |
public StaticBytesStreamer(int total, int chunkSize, int sleep) { | |
this.total = total; | |
this.available = total; | |
this.chunkSize = chunkSize; | |
this.sleep = sleep; | |
} | |
public int getLength() { | |
return total; | |
} | |
public int available() { | |
return available; | |
} | |
public byte[] next() { | |
available -= chunkSize; | |
sent += chunkSize; | |
byte[] data = CharBuffer.allocate(chunkSize).toString().replace('\0', 'a').getBytes(); | |
if (sleep > 0) { | |
try { | |
Thread.sleep(sleep * 1000); | |
} catch (InterruptedException ex) { | |
// ignore | |
} | |
} | |
System.out.println("Send [" + sent + "] with [" + available + "] data left to send"); | |
return data; | |
} | |
} | |
@WebServlet(asyncSupported = true) | |
public class EchoAsync2Servlet extends HttpServlet { | |
private static final long serialVersionUID = -3038617032944575769L; | |
public static final int BUFFER_SIZE = 8 * 1024; | |
@Override | |
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException | |
{ | |
AsyncContext asyncContext = request.startAsync(request, response); | |
asyncContext.setTimeout(0); | |
Echoer echoer = new Echoer(asyncContext); | |
response.getOutputStream().setWriteListener(echoer); | |
request.getInputStream().setReadListener(echoer); | |
} | |
private class Echoer implements ReadListener, WriteListener { | |
private final byte[] buffer = new byte[BUFFER_SIZE]; | |
private final AsyncContext asyncContext; | |
private final ServletInputStream input; | |
private final ServletOutputStream output; | |
private long totalRead = 0; | |
private Echoer(AsyncContext asyncContext) throws IOException { | |
this.asyncContext = asyncContext; | |
this.input = asyncContext.getRequest().getInputStream(); | |
this.output = asyncContext.getResponse().getOutputStream(); | |
} | |
@Override | |
public void onDataAvailable() throws IOException { | |
while (input.isReady()) { | |
if (output.isReady()) { | |
int read = input.read(buffer); | |
totalRead += read; | |
output.write(buffer, 0, read); | |
getServletContext().log("read and wrote [" + read + "] total [" + totalRead + " of " + TOTAL_SIZE + "]"); | |
} else { | |
return; | |
} | |
} | |
} | |
@Override | |
public void onAllDataRead() throws IOException { | |
asyncContext.complete(); | |
} | |
@Override | |
public void onWritePossible() throws IOException { | |
if (input.isFinished()) { | |
onAllDataRead(); | |
} | |
else { | |
onDataAvailable(); | |
} | |
} | |
@Override | |
public void onError(Throwable failure) { | |
failure.printStackTrace(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment