Created
May 21, 2015 08:29
-
-
Save anidotnet/4ff41c7a8c7d53400709 to your computer and use it in GitHub Desktop.
disruptor-demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.abstractclass; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* Demo App | |
*/ | |
public class App { | |
private static Logger logger = LoggerFactory.getLogger(App.class); | |
public static void main( String[] args ) { | |
LMAXWriter lmaxWriter = new LMAXWriter(); | |
logger.info("Initializing lmax disruptor."); | |
lmaxWriter.setRingBufferSize(7); //deliberately set. Final ring buffer size would be 8. | |
lmaxWriter.init(); | |
// submit messages to write concurrently using disruptor | |
for (int i = 0; i < 10; i++) { | |
lmaxWriter.submitMessage("Message Sequence " + i); | |
} | |
logger.info("All message submitted."); | |
lmaxWriter.close(); | |
logger.info("Program executed successfully."); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void init() { | |
// create a thread pool executor to be used by disruptor | |
Executor executor = Executors.newCachedThreadPool(); | |
// initialize our event factory | |
WriteEventFactory factory = new WriteEventFactory(); | |
if (ringBufferSize == 0) { | |
ringBufferSize = 1024; | |
} | |
// ring buffer size always has to be the power of 2. | |
// so if it is not, make it equal to the nearest integer. | |
double power = Math.log(ringBufferSize) / Math.log(2); | |
if (power % 1 != 0) { | |
power = Math.ceil(power); | |
ringBufferSize = (int) Math.pow(2, power); | |
logger.info("New ring buffer size = " + ringBufferSize); | |
} | |
// initialize our event handler. | |
WriteEventHandler handler = new WriteEventHandler(); | |
// initialize the disruptor | |
disruptor = new Disruptor<WriteEvent>(factory, ringBufferSize, executor); | |
disruptor.handleEventsWith(handler); | |
// set our custom exception handler | |
ExceptionHandler exceptionHandler = new WriteExceptionHandler(); | |
disruptor.handleExceptionsFor(handler).with(exceptionHandler); | |
// start the disruptor and get the generated ring buffer instance | |
disruptor.start(); | |
// initialize the event producer to submit messages | |
writeEventProducer = new WriteEventProducer(disruptor); | |
logger.info("Disruptor engine started successfully."); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.abstractclass; | |
/** | |
* @author Anindya Chatterjee. | |
*/ | |
public class WriteEvent { | |
private String message; | |
public void set(String message){ | |
this.message = message; | |
} | |
public String get() { | |
return this.message; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.abstractclass; | |
import com.lmax.disruptor.EventFactory; | |
/** | |
* @author Anindya Chatterjee. | |
*/ | |
public class WriteEventFactory implements EventFactory<WriteEvent> { | |
public WriteEvent newInstance() { | |
return new WriteEvent(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.abstractclass; | |
import com.lmax.disruptor.EventHandler; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* @author Anindya Chatterjee. | |
*/ | |
public class WriteEventHandler implements EventHandler<WriteEvent> { | |
private Logger logger = LoggerFactory.getLogger(getClass()); | |
public void onEvent(WriteEvent writeEvent, long sequence, boolean endOfBatch) throws Exception { | |
if (writeEvent != null && writeEvent.get() != null) { | |
String message = writeEvent.get(); | |
// Put you business logic here. | |
// here it will print only the submitted message. | |
logger.error(message + " processed."); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment