Skip to content

Instantly share code, notes, and snippets.

@rohit-sachan
Last active April 1, 2020 14:10
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save rohit-sachan/8413977 to your computer and use it in GitHub Desktop.
Save rohit-sachan/8413977 to your computer and use it in GitHub Desktop.
Sample example for lmax disruptor
package prodcons;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.text.NumberFormat;
import java.util.Locale;
import java.util.concurrent.BlockingQueue;
/**
* Created by Rohit Sachan on 1/13/14.
*/
public class Consumer implements Runnable{
private BlockingQueue<Message> queue;
public Consumer(BlockingQueue<Message> q){
this.queue=q;
}
@Override
public void run() {
try{
Message msg;
long startTime = System.nanoTime();
System.out.println();
//consuming messages until exit message is received
while((msg = queue.take()).getMsg() !=-1){
Integer msgStr = msg.getMsg();
if(msgStr % 10000 == 0)
System.out.println("Consumed "+msgStr);
}
double timeINnanos = (System.nanoTime()-startTime);
System.out.println(timeINnanos + " taken in Nanos");
double timetaken = (timeINnanos/1e9);
NumberFormat nf = NumberFormat.getNumberInstance(Locale.US);
DecimalFormat df = (DecimalFormat)nf;
System.out.printf("Total time taken in consuming %d, messages %n", Producer.maxMsg * Producer.multiply);
System.out.println("Speed = "+df.format(Producer.maxMsg*Producer.multiply/timetaken) + " msgs per sec");
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
package prodcons;
import com.lmax.disruptor.EventFactory;
/**
* Created by Rohit Sachan on 1/13/14.
*/
public class Message {
private Integer msg;
public void setMsg(Integer msg) {
this.msg = msg;
}
public Integer getMsg() {
return msg;
}
public final static EventFactory<Message> EVENT_FACTORY = new EventFactory<Message>() {
@Override
public Message newInstance() {
return new Message();
}
};
}
package prodcons;
import com.lmax.disruptor.RingBuffer;
import java.util.concurrent.BlockingQueue;
/**
* Created by Rohit Sachan on 1/13/14.
*/
public class Producer implements Runnable {
static public Integer maxMsg = 1000000;
static public int multiply = 10;
private RingBuffer<Message> rb;
public Producer(RingBuffer rb){
this.rb=rb;
}
@Override
public void run() {
for (int i =0; i < maxMsg * multiply ; i++){
long seq = rb.next();
Message msg= rb.get(seq);
msg.setMsg(i);
rb.publish(seq);
}
System.out.println("done sending " + maxMsg*multiply + " messages");
}
}
package prodcons.disruptor;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.*;
/**
* Created by Rohit Sachan on 1/13/14.
*/
public class Sample2 {
public static void main(String[] args) {
/*//Creating BlockingQueue of size 10
BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>(10000000);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
//starting producer to produce messages in queue
new Thread(producer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
System.out.println("Producer and Consumer has been started");*/
ExecutorService exec = Executors.newCachedThreadPool();
// Preallocate RingBuffer with 1024 ValueEvents
Disruptor<Message> disruptor = new Disruptor<Message>(Message.EVENT_FACTORY, 2048, exec,
ProducerType.SINGLE, new BusySpinWaitStrategy());
// make handler
final long startTime = System.nanoTime();
final EventHandler<Message> handler = new EventHandler<Message>() {
// event will eventually be recycled by the Disruptor after it wraps
public void onEvent(final Message event, final long sequence, final boolean endOfBatch) throws Exception {
Integer value = event.getMsg();
if(value % 10000 == 0){
System.out.println("By Handler : ValueEvent: " + value + " Sequence: " + sequence);
double timeINnanos = (System.nanoTime()-startTime);
double timetaken = (timeINnanos/1e9);
System.out.println("Time Taken till now in sec " + timetaken );
}
}
};
final EventHandler<Message> handler2 = new EventHandler<Message>() {
// event will eventually be recycled by the Disruptor after it wraps
public void onEvent(final Message event, final long sequence, final boolean endOfBatch) throws Exception {
Integer value = event.getMsg();
if(value % 10000 == 0){
System.out.println("By Handler2 :ValueEvent: " + value + " Sequence: " + sequence);
double timeINnanos = (System.nanoTime()-startTime);
double timetaken = (timeINnanos/1e9);
System.out.println("Time Taken till now in sec " + timetaken );
}
}
};
// register handler with disruptor
disruptor.handleEventsWith(handler,handler2);
RingBuffer<Message> ringBuffer = disruptor.start();
Producer producer = new Producer(ringBuffer);
//starting producer to produce messages in queue
Thread p = new Thread(producer);
p.start();
try {
p.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment