Skip to content

Instantly share code, notes, and snippets.

@TanyaGaleyev
Last active July 8, 2016 09:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TanyaGaleyev/e6948243d4e3b36ba86e53c8f0b315d9 to your computer and use it in GitHub Desktop.
Save TanyaGaleyev/e6948243d4e3b36ba86e53c8f0b315d9 to your computer and use it in GitHub Desktop.
Disruptor 100% CPU consumption when using SleepingWaitStrategy on 32 bit Linux
package org.ivan.experiments.disruptor;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class HundredCpu {
public static void main(String[] args) throws InterruptedException {
WaitStrategy waitStrategy = new SleepingWaitStrategy();
if (args.length == 0) {
} else if (args[0].equals("sleep")) {
waitStrategy = new SleepingWaitStrategy();
} else if (args[0].equals("psleep")) {
waitStrategy = new ProfileFriendlySleepingWaitStrategy();
} else if (args[0].equals("block")) {
waitStrategy = new BlockingWaitStrategy();
}
Disruptor<MyEvent> disruptor = new Disruptor<>(
MyEvent::new, 1024, (ThreadFactory) Thread::new, ProducerType.MULTI, waitStrategy);
disruptor.handleEventsWith((EventHandler<MyEvent>) (event, sequence, endOfBatch) -> System.out.println(event));
RingBuffer<MyEvent> ringBuffer = disruptor.start();
for (int i = 0; i < 1000; i++) {
publishEvent(ringBuffer);
TimeUnit.SECONDS.sleep(1);
}
disruptor.shutdown();
}
private static void publishEvent(RingBuffer<MyEvent> ringBuffer) {
long seq = ringBuffer.next();
try {
MyEvent myEvent = ringBuffer.get(seq);
myEvent.timestamp = System.currentTimeMillis();
} finally {
ringBuffer.publish(seq);
}
}
}
class MyEvent {
long timestamp;
MyEvent() {;
}
@Override
public String toString() {
return "MyEvent{timestamp=" + timestamp + "}";
}
}
class ProfileFriendlySleepingWaitStrategy implements WaitStrategy {
private static final int DEFAULT_RETRIES = 200;
private final int retries;
public ProfileFriendlySleepingWaitStrategy()
{
this(DEFAULT_RETRIES);
}
public ProfileFriendlySleepingWaitStrategy(int retries)
{
this.retries = retries;
}
@Override
public long waitFor(
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
int counter = retries;
while ((availableSequence = dependentSequence.get()) < sequence)
{
counter = applyWaitMethod(barrier, counter);
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
}
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException
{
barrier.checkAlert();
if (counter > 100)
{
counter = spin(counter);
}
else if (counter > 0)
{
counter = yield(counter);
}
else
{
park();
}
return counter;
}
private static int spin(int counter) {
return counter - 1;
}
private static int yield(int counter) {
Thread.yield();
return counter - 1;
}
private static void park() {
LockSupport.parkNanos(1L);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment