Created
June 30, 2022 11:49
-
-
Save plevart/3084adc952df73811e6c117b5765a7d1 to your computer and use it in GitHub Desktop.
Test to provoke OOME in ReentrantLock
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package concurrent; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.ReentrantLock; | |
import java.util.function.Consumer; | |
import java.util.stream.IntStream; | |
import java.util.stream.Stream; | |
public class OOMEInReentrantLock implements Runnable { | |
public static void main(String[] args) { | |
// wait for VM threads to startup and allocate their resources before filling heap | |
// or else we might get "Exception in VM (AttachListener::init)" when run from IDEA | |
try { | |
Thread.sleep(1000L); | |
} catch (InterruptedException e) { | |
throw new IllegalStateException("Unexpected interrupt", e); | |
} | |
System.out.println("\nStarting test using MonitorLock ..."); | |
test(new MonitorLock(), 4); | |
System.out.println("... test using MonitorLock passes"); | |
System.out.println("\nStarting test using ReentrantLockLock ..."); | |
test(new ReentrantLockLock(), 4); | |
System.out.println("... test using ReentrantLockLock passes"); | |
} | |
static void test(Lock lock, int threadCount) { | |
var currentJobId = new AtomicInteger(); | |
var jobs = IntStream | |
.range(0, threadCount) | |
.mapToObj(i -> new OOMEInReentrantLock(lock, i, (i + 1) % threadCount, currentJobId)) | |
.toArray(OOMEInReentrantLock[]::new); | |
var threads = Stream | |
.of(jobs) | |
.map(Thread::new) | |
.toArray(Thread[]::new); | |
Stream.of(threads).forEach(Thread::start); | |
long t0 = System.nanoTime(); | |
long t1 = fillHeapAndJoin(threads); | |
// attempt to free heap | |
System.gc(); | |
RuntimeException exception = null; | |
for (var job : jobs) { | |
if (job.exception != null) { | |
if (exception == null) { | |
exception = new RuntimeException("Job(s) failed"); | |
} | |
exception.addSuppressed(job.exception); | |
} | |
} | |
System.out.println( | |
"... fillHeap time: " + (t1 - t0) / 1000_000 + | |
" millis, whole test time: " + (System.nanoTime() - t0) / 1000_000 + | |
" millis" | |
); | |
if (exception != null) { | |
throw exception; | |
} | |
} | |
static long fillHeapAndJoin(Thread[] threads) { | |
Object data = fillHeap(); | |
long t = System.nanoTime(); | |
// from now on be careful not to allocate... | |
for (var thread : threads) { | |
try { | |
thread.join(); | |
} catch (InterruptedException e) { | |
throw new IllegalStateException("Unexpected interrupt", e); | |
} | |
} | |
if (data == null) { | |
throw new NullPointerException("data"); | |
} | |
data = null; | |
return t; | |
} | |
final Lock lock; | |
final int jobId, nextJobId; | |
final AtomicInteger currentJobId; | |
Throwable exception; | |
OOMEInReentrantLock(Lock lock, int jobId, int nextJobId, AtomicInteger currentJobId) { | |
this.lock = lock; | |
this.jobId = jobId; | |
this.nextJobId = nextJobId; | |
this.currentJobId = currentJobId; | |
} | |
@Override | |
public void run() { | |
try { | |
// wait before proceeding to allow heap to be filled | |
Thread.sleep(1000L); | |
for (int i = 0; i < 1000 && currentJobId.get() >= 0; i++) { | |
lock.withLock(iteration); | |
} | |
} catch (Throwable e) { | |
exception = e; | |
currentJobId.set(-1); // signal for all threads to terminate | |
} | |
} | |
// pre-allocate and not use lambda to avoid allocations during memory shortage | |
final Consumer<Lock> iteration = new Consumer<>() { | |
@Override | |
public void accept(Lock lock) { | |
try { | |
int curJobId; | |
while ((curJobId = currentJobId.get()) != jobId && curJobId >= 0) { | |
lock.await(); | |
} | |
if (curJobId == jobId && currentJobId.compareAndSet(curJobId, nextJobId)) { | |
lock.signal(); | |
} | |
} catch (InterruptedException e) { | |
throw new IllegalStateException("Unexpected interrupt", e); | |
} | |
} | |
}; | |
static Object[] fillHeap() { | |
Object[] first = null, last = null; | |
int size = 1 << 20; | |
while (size > 0) { | |
try { | |
Object[] array = new Object[size]; | |
if (first == null) { | |
first = array; | |
} else { | |
last[0] = array; | |
} | |
last = array; | |
} catch (OutOfMemoryError oome) { | |
size = size >>> 1; | |
} | |
} | |
return first; | |
} | |
interface Lock { | |
void withLock(Consumer<Lock> task); | |
void signal(); | |
void await() throws InterruptedException; | |
} | |
static class ReentrantLockLock implements Lock { | |
private final ReentrantLock lock = new ReentrantLock(); | |
private final Condition cond = lock.newCondition(); | |
@Override | |
public void withLock(Consumer<Lock> task) { | |
lock.lock(); | |
try { | |
task.accept(this); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
@Override | |
public void signal() { | |
cond.signalAll(); | |
} | |
@Override | |
public void await() throws InterruptedException { | |
cond.await(100L, TimeUnit.MILLISECONDS); | |
} | |
} | |
static class MonitorLock implements Lock { | |
private final Object lock = new Object(); | |
@Override | |
public void withLock(Consumer<Lock> task) { | |
synchronized (lock) { | |
task.accept(this); | |
} | |
} | |
@Override | |
public void signal() { | |
lock.notifyAll(); | |
} | |
@Override | |
public void await() throws InterruptedException { | |
lock.wait(100L); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment