Skip to content

Instantly share code, notes, and snippets.

@anidotnet
Created May 21, 2015 08:29
Show Gist options
  • Save anidotnet/4ff41c7a8c7d53400709 to your computer and use it in GitHub Desktop.
Save anidotnet/4ff41c7a8c7d53400709 to your computer and use it in GitHub Desktop.
disruptor-demo
package org.abstractclass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Demo App
*/
public class App {
private static Logger logger = LoggerFactory.getLogger(App.class);
public static void main( String[] args ) {
LMAXWriter lmaxWriter = new LMAXWriter();
logger.info("Initializing lmax disruptor.");
lmaxWriter.setRingBufferSize(7); //deliberately set. Final ring buffer size would be 8.
lmaxWriter.init();
// submit messages to write concurrently using disruptor
for (int i = 0; i < 10; i++) {
lmaxWriter.submitMessage("Message Sequence " + i);
}
logger.info("All message submitted.");
lmaxWriter.close();
logger.info("Program executed successfully.");
}
}
public void init() {
// create a thread pool executor to be used by disruptor
Executor executor = Executors.newCachedThreadPool();
// initialize our event factory
WriteEventFactory factory = new WriteEventFactory();
if (ringBufferSize == 0) {
ringBufferSize = 1024;
}
// ring buffer size always has to be the power of 2.
// so if it is not, make it equal to the nearest integer.
double power = Math.log(ringBufferSize) / Math.log(2);
if (power % 1 != 0) {
power = Math.ceil(power);
ringBufferSize = (int) Math.pow(2, power);
logger.info("New ring buffer size = " + ringBufferSize);
}
// initialize our event handler.
WriteEventHandler handler = new WriteEventHandler();
// initialize the disruptor
disruptor = new Disruptor<WriteEvent>(factory, ringBufferSize, executor);
disruptor.handleEventsWith(handler);
// set our custom exception handler
ExceptionHandler exceptionHandler = new WriteExceptionHandler();
disruptor.handleExceptionsFor(handler).with(exceptionHandler);
// start the disruptor and get the generated ring buffer instance
disruptor.start();
// initialize the event producer to submit messages
writeEventProducer = new WriteEventProducer(disruptor);
logger.info("Disruptor engine started successfully.");
}
package org.abstractclass;
/**
* @author Anindya Chatterjee.
*/
public class WriteEvent {
private String message;
public void set(String message){
this.message = message;
}
public String get() {
return this.message;
}
}
package org.abstractclass;
import com.lmax.disruptor.EventFactory;
/**
* @author Anindya Chatterjee.
*/
public class WriteEventFactory implements EventFactory<WriteEvent> {
public WriteEvent newInstance() {
return new WriteEvent();
}
}
package org.abstractclass;
import com.lmax.disruptor.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Anindya Chatterjee.
*/
public class WriteEventHandler implements EventHandler<WriteEvent> {
private Logger logger = LoggerFactory.getLogger(getClass());
public void onEvent(WriteEvent writeEvent, long sequence, boolean endOfBatch) throws Exception {
if (writeEvent != null && writeEvent.get() != null) {
String message = writeEvent.get();
// Put you business logic here.
// here it will print only the submitted message.
logger.error(message + " processed.");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment