Last active
July 8, 2016 09:05
-
-
Save TanyaGaleyev/e6948243d4e3b36ba86e53c8f0b315d9 to your computer and use it in GitHub Desktop.
Disruptor 100% CPU consumption when using SleepingWaitStrategy on 32 bit Linux
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.ivan.experiments.disruptor; | |
import com.lmax.disruptor.*; | |
import com.lmax.disruptor.dsl.Disruptor; | |
import com.lmax.disruptor.dsl.ProducerType; | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.locks.LockSupport; | |
public class HundredCpu { | |
public static void main(String[] args) throws InterruptedException { | |
WaitStrategy waitStrategy = new SleepingWaitStrategy(); | |
if (args.length == 0) { | |
} else if (args[0].equals("sleep")) { | |
waitStrategy = new SleepingWaitStrategy(); | |
} else if (args[0].equals("psleep")) { | |
waitStrategy = new ProfileFriendlySleepingWaitStrategy(); | |
} else if (args[0].equals("block")) { | |
waitStrategy = new BlockingWaitStrategy(); | |
} | |
Disruptor<MyEvent> disruptor = new Disruptor<>( | |
MyEvent::new, 1024, (ThreadFactory) Thread::new, ProducerType.MULTI, waitStrategy); | |
disruptor.handleEventsWith((EventHandler<MyEvent>) (event, sequence, endOfBatch) -> System.out.println(event)); | |
RingBuffer<MyEvent> ringBuffer = disruptor.start(); | |
for (int i = 0; i < 1000; i++) { | |
publishEvent(ringBuffer); | |
TimeUnit.SECONDS.sleep(1); | |
} | |
disruptor.shutdown(); | |
} | |
private static void publishEvent(RingBuffer<MyEvent> ringBuffer) { | |
long seq = ringBuffer.next(); | |
try { | |
MyEvent myEvent = ringBuffer.get(seq); | |
myEvent.timestamp = System.currentTimeMillis(); | |
} finally { | |
ringBuffer.publish(seq); | |
} | |
} | |
} | |
class MyEvent { | |
long timestamp; | |
MyEvent() {; | |
} | |
@Override | |
public String toString() { | |
return "MyEvent{timestamp=" + timestamp + "}"; | |
} | |
} | |
class ProfileFriendlySleepingWaitStrategy implements WaitStrategy { | |
private static final int DEFAULT_RETRIES = 200; | |
private final int retries; | |
public ProfileFriendlySleepingWaitStrategy() | |
{ | |
this(DEFAULT_RETRIES); | |
} | |
public ProfileFriendlySleepingWaitStrategy(int retries) | |
{ | |
this.retries = retries; | |
} | |
@Override | |
public long waitFor( | |
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) | |
throws AlertException, InterruptedException | |
{ | |
long availableSequence; | |
int counter = retries; | |
while ((availableSequence = dependentSequence.get()) < sequence) | |
{ | |
counter = applyWaitMethod(barrier, counter); | |
} | |
return availableSequence; | |
} | |
@Override | |
public void signalAllWhenBlocking() | |
{ | |
} | |
private int applyWaitMethod(final SequenceBarrier barrier, int counter) | |
throws AlertException | |
{ | |
barrier.checkAlert(); | |
if (counter > 100) | |
{ | |
counter = spin(counter); | |
} | |
else if (counter > 0) | |
{ | |
counter = yield(counter); | |
} | |
else | |
{ | |
park(); | |
} | |
return counter; | |
} | |
private static int spin(int counter) { | |
return counter - 1; | |
} | |
private static int yield(int counter) { | |
Thread.yield(); | |
return counter - 1; | |
} | |
private static void park() { | |
LockSupport.parkNanos(1L); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment