Skip to content

Instantly share code, notes, and snippets.

@hszander
Created January 1, 2012 14:55
Show Gist options
  • Save hszander/1547524 to your computer and use it in GitHub Desktop.
Save hszander/1547524 to your computer and use it in GitHub Desktop.
Disruptor RingBuffer-based minimalist NIO HTTP test server
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import com.lmax.disruptor.AlertException;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.FatalExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SingleThreadedClaimStrategy;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.dsl.Disruptor;
/**
* @author Jon Brisbin <jon@jbrisbin.com>
*/
public class DisruptorTest {
@SuppressWarnings({"unchecked"})
public static void main(String[] args) throws InterruptedException, IOException, AlertException, TimeoutException {
final int cores = Runtime.getRuntime().availableProcessors();
Executor executor = Executors.newFixedThreadPool(cores);
final int bufferSize = 8 * 1024;
int ringSize = new Double(Math.pow(2, cores + 1)).intValue();
final RingBuffer<ByteBuffer> bufferRing = new Disruptor<>(
new EventFactory<ByteBuffer>() {
@Override public ByteBuffer newInstance() {
return ByteBuffer.allocate(bufferSize);
}
},
executor,
new SingleThreadedClaimStrategy(ringSize),
new BlockingWaitStrategy()
).start();
int handlerCount = cores;
WorkHandler<SelectionEvent>[] handlers = new WorkHandler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new WorkHandler<SelectionEvent>() {
ByteBuffer msg = ByteBuffer.wrap(
("HTTP/1.1 200 OK\r\n" +
"Connection: Keep-Alive\r\n" +
"Content-Type: text/plain\r\n" +
"Content-Length: 12\r\n\r\n" +
"Hello World!").getBytes());
@Override public void onEvent(SelectionEvent ev) throws Exception {
// Allocate a ByteBuffer from a RingBuffer
ByteBuffer buffer = bufferRing.get(ev.bufferId);
if (buffer.position() > 0) {
buffer.clear();
}
try {
int read = -1;
try {
// Read data from the Channel
read = ev.channel.read(buffer);
} catch (IOException e) {
switch (e.getMessage()) {
case "Connection reset by peer":
case "Broken pipe":
break;
default:
e.printStackTrace();
}
} catch (CancelledKeyException e) {
ev.channel.close();
}
if (read > 0) {
try {
// Write the response immediately
ev.channel.write(msg.duplicate());
} catch (IOException e) {
switch (e.getMessage()) {
case "Connection reset by peer":
case "Broken pipe":
break;
default:
e.printStackTrace();
}
} catch (CancelledKeyException e) {
// Close the channel if something goes wrong
ev.channel.close();
}
// Read the data into memory
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
//String input = new String(bytes);
} else {
ev.key.cancel();
}
} finally {
// Put the ByteBuffer back into the RingBuffer for re-use
bufferRing.publish(ev.bufferId);
}
}
};
}
// Use a WorkerPool for handling requests
WorkerPool acceptPool = new WorkerPool(
new EventFactory<SelectionEvent>() {
@Override public SelectionEvent newInstance() {
return new SelectionEvent();
}
},
new SingleThreadedClaimStrategy(ringSize),
new BlockingWaitStrategy(),
new FatalExceptionHandler(),
handlers);
final RingBuffer<SelectionEvent> workerRing = acceptPool.start(executor);
final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
final Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 3000), 1024);
// Allocate the first worker
long workerId = workerRing.next();
while (true) {
int cnt = 0;
try {
cnt = selector.select();
} catch (CancelledKeyException ignored) {
// There's a bug on Mac OS X's JVM that might throw a bogus CancelledKeyException
}
if (cnt > 0) {
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
try {
if (key.isValid()) {
SelectionEvent event;
if (key.isAcceptable()) {
if (serverSocketChannel.isOpen()) {
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.setReceiveBufferSize(bufferSize);
serverSocket.setReuseAddress(true);
boolean hasSocket = true;
while (hasSocket) {
SocketChannel channel = serverSocketChannel.accept();
if (null != channel) {
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_RCVBUF, bufferSize);
channel.setOption(StandardSocketOptions.SO_SNDBUF, bufferSize);
channel.register(selector, SelectionKey.OP_READ);
// Allocate an Event object for dispatching to the handler
event = workerRing.get(workerId);
event.id = workerId;
event.channel = channel;
event.selector = selector;
event.key = key;
event.serverChannel = serverSocketChannel;
// Allocate a new ByteBuffer from this thread (since I'm using a SingleThreadedClaimStrategy)
event.bufferId = bufferRing.next();
// Dispatch this event to a handler
workerRing.publish(workerId);
// Immediately allocate the next worker ID
workerId = workerRing.next();
} else {
hasSocket = false;
}
}
}
}
if (key.isReadable() || key.isWritable()) {
// Allocate an Event object for dispatching to the handler
event = workerRing.get(workerId);
event.id = workerId;
event.channel = (SocketChannel) key.channel();
event.selector = selector;
event.key = key;
event.serverChannel = serverSocketChannel;
// Allocate a new ByteBuffer from this thread (since I'm using a SingleThreadedClaimStrategy)
event.bufferId = bufferRing.next();
// Dispatch this event to a handler
workerRing.publish(workerId);
// Immediately allocate the next worker ID
workerId = workerRing.next();
}
}
} catch (CancelledKeyException e) {
key.channel().close();
}
}
}
}
}
private static class SelectionEvent {
Long id;
Selector selector;
ServerSocketChannel serverChannel;
SelectionKey key;
SocketChannel channel;
long bufferId = -1L;
public SelectionEvent() {
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment