Skip to content

Instantly share code, notes, and snippets.

@chirino
Last active August 29, 2015 13:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chirino/8937804 to your computer and use it in GitHub Desktop.
Save chirino/8937804 to your computer and use it in GitHub Desktop.
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.VertxFactory;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.net.NetServer;
import org.vertx.java.core.net.NetSocket;
import javax.net.SocketFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class BufferingTest {
public static void main(String[] args) throws Exception {
Vertx vertx = VertxFactory.newVertx();
NetServer server = vertx.createNetServer();
server.connectHandler(new Handler<NetSocket>() {
@Override
public void handle(final NetSocket socket) {
socket.dataHandler(new Handler<Buffer>() {
@Override
public void handle(Buffer event) {
// We only want to receive one buffer and then block the sender.
System.out.println("Received buffer. Length: " + event.length() +" bytes");
socket.pause();
}
});
}
});
FutureHandler<AsyncResult<NetServer>> listenFuture = new FutureHandler<AsyncResult<NetServer>>();
server.listen(0, listenFuture);
final int port = result(listenFuture).port();
final AtomicInteger writeCounter = new AtomicInteger(0);
new Thread("Write thread..") {
@Override
public void run() {
try {
Socket socket = SocketFactory.getDefault().createSocket("localhost", port);
OutputStream outputStream = socket.getOutputStream();
while (writeCounter.get() < 1024 * 1024) {
outputStream.write('x');
outputStream.flush();
writeCounter.incrementAndGet();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
while (true) {
Thread.sleep(1000);
System.out.println("Sent length: " + (writeCounter.get() / 1024.f) + "k ");
}
}
static public class FutureHandler<T> implements Handler<T> {
CountDownLatch done = new CountDownLatch(1);
T event;
@Override
public void handle(T event) {
this.event = event;
done.countDown();
}
T await() throws InterruptedException {
done.await();
return event;
}
T await(long timeout, TimeUnit unit) throws InterruptedException {
if (done.await(timeout, unit)) {
return event;
} else {
return null;
}
}
}
public static <T> T result(FutureHandler<AsyncResult<T>> future) throws Exception {
AsyncResult<T> asyncResult = future.await();
if (asyncResult.failed()) {
throw new Exception(asyncResult.cause());
} else {
return asyncResult.result();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment