Skip to content

Instantly share code, notes, and snippets.

@plevart
Created June 30, 2022 11:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save plevart/3084adc952df73811e6c117b5765a7d1 to your computer and use it in GitHub Desktop.
Save plevart/3084adc952df73811e6c117b5765a7d1 to your computer and use it in GitHub Desktop.
Test to provoke OOME in ReentrantLock
package concurrent;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class OOMEInReentrantLock implements Runnable {
public static void main(String[] args) {
// wait for VM threads to startup and allocate their resources before filling heap
// or else we might get "Exception in VM (AttachListener::init)" when run from IDEA
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new IllegalStateException("Unexpected interrupt", e);
}
System.out.println("\nStarting test using MonitorLock ...");
test(new MonitorLock(), 4);
System.out.println("... test using MonitorLock passes");
System.out.println("\nStarting test using ReentrantLockLock ...");
test(new ReentrantLockLock(), 4);
System.out.println("... test using ReentrantLockLock passes");
}
static void test(Lock lock, int threadCount) {
var currentJobId = new AtomicInteger();
var jobs = IntStream
.range(0, threadCount)
.mapToObj(i -> new OOMEInReentrantLock(lock, i, (i + 1) % threadCount, currentJobId))
.toArray(OOMEInReentrantLock[]::new);
var threads = Stream
.of(jobs)
.map(Thread::new)
.toArray(Thread[]::new);
Stream.of(threads).forEach(Thread::start);
long t0 = System.nanoTime();
long t1 = fillHeapAndJoin(threads);
// attempt to free heap
System.gc();
RuntimeException exception = null;
for (var job : jobs) {
if (job.exception != null) {
if (exception == null) {
exception = new RuntimeException("Job(s) failed");
}
exception.addSuppressed(job.exception);
}
}
System.out.println(
"... fillHeap time: " + (t1 - t0) / 1000_000 +
" millis, whole test time: " + (System.nanoTime() - t0) / 1000_000 +
" millis"
);
if (exception != null) {
throw exception;
}
}
static long fillHeapAndJoin(Thread[] threads) {
Object data = fillHeap();
long t = System.nanoTime();
// from now on be careful not to allocate...
for (var thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
throw new IllegalStateException("Unexpected interrupt", e);
}
}
if (data == null) {
throw new NullPointerException("data");
}
data = null;
return t;
}
final Lock lock;
final int jobId, nextJobId;
final AtomicInteger currentJobId;
Throwable exception;
OOMEInReentrantLock(Lock lock, int jobId, int nextJobId, AtomicInteger currentJobId) {
this.lock = lock;
this.jobId = jobId;
this.nextJobId = nextJobId;
this.currentJobId = currentJobId;
}
@Override
public void run() {
try {
// wait before proceeding to allow heap to be filled
Thread.sleep(1000L);
for (int i = 0; i < 1000 && currentJobId.get() >= 0; i++) {
lock.withLock(iteration);
}
} catch (Throwable e) {
exception = e;
currentJobId.set(-1); // signal for all threads to terminate
}
}
// pre-allocate and not use lambda to avoid allocations during memory shortage
final Consumer<Lock> iteration = new Consumer<>() {
@Override
public void accept(Lock lock) {
try {
int curJobId;
while ((curJobId = currentJobId.get()) != jobId && curJobId >= 0) {
lock.await();
}
if (curJobId == jobId && currentJobId.compareAndSet(curJobId, nextJobId)) {
lock.signal();
}
} catch (InterruptedException e) {
throw new IllegalStateException("Unexpected interrupt", e);
}
}
};
static Object[] fillHeap() {
Object[] first = null, last = null;
int size = 1 << 20;
while (size > 0) {
try {
Object[] array = new Object[size];
if (first == null) {
first = array;
} else {
last[0] = array;
}
last = array;
} catch (OutOfMemoryError oome) {
size = size >>> 1;
}
}
return first;
}
interface Lock {
void withLock(Consumer<Lock> task);
void signal();
void await() throws InterruptedException;
}
static class ReentrantLockLock implements Lock {
private final ReentrantLock lock = new ReentrantLock();
private final Condition cond = lock.newCondition();
@Override
public void withLock(Consumer<Lock> task) {
lock.lock();
try {
task.accept(this);
} finally {
lock.unlock();
}
}
@Override
public void signal() {
cond.signalAll();
}
@Override
public void await() throws InterruptedException {
cond.await(100L, TimeUnit.MILLISECONDS);
}
}
static class MonitorLock implements Lock {
private final Object lock = new Object();
@Override
public void withLock(Consumer<Lock> task) {
synchronized (lock) {
task.accept(this);
}
}
@Override
public void signal() {
lock.notifyAll();
}
@Override
public void await() throws InterruptedException {
lock.wait(100L);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment