Instantly share code, notes, and snippets.

Embed
What would you like to do?
package fs.lock;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
public class LockingOfByteBufferAllocation {
static final int MB = 1024 * 1024;
/**
* ex)
* java -Xmx100m fs.lock.LockingOfByteBufferAllocation 1 5000 20
* java -Xmx100m fs.lock.LockingOfByteBufferAllocation 2 5000 20
*/
public static void main(String... args) {
if (args.length != 3) {
System.out.println("args 0: 1|2, args 1: IterationCount, args 2: allocationMB");
System.exit(-1);
}
boolean isType1 = "1".equals(args[0]);
int iterCount = Integer.parseInt(args[1]);
int allocationMB = Integer.parseInt(args[2]) * 1000 * 1000;
System.out.format("isType1 %s, iterCount %d, allocationMB %d%n",
isType1,
iterCount,
allocationMB);
mallocTest(isType1, iterCount, allocationMB);
}
private static void mallocTest(boolean isType1, int iterCount, int allocationMB) {
AtomicInteger counter = new AtomicInteger(iterCount);
AtomicReference<Throwable> errorRef = new AtomicReference<>();
Allocate allocate = new Allocate(counter, errorRef, allocationMB);
long time = System.currentTimeMillis();
AtomicLong last = new AtomicLong(time);
new Thread() {
@Override
public void run() {
while (true) {
if (errorRef.get() != null) {
System.err.println("Woops!");
errorRef.get().printStackTrace();
System.exit(-1);
}
if (counter.get() == 0) {
System.out.println(System.currentTimeMillis() - time);
System.exit(-1);
}
if (System.currentTimeMillis() - last.get() > 1000) {
Runtime runtime = Runtime.getRuntime();
System.out.format("Max %d Total %d Used %d%n",
runtime.maxMemory() / MB,
runtime.totalMemory() / MB,
(runtime.totalMemory() - runtime.freeMemory()) / MB);
last.set(System.currentTimeMillis());
}
}
}
}.start();
try {
IntStream.range(0, iterCount)
.forEach(i -> new T(allocate, isType1).start());
} catch (OutOfMemoryError e) {
errorRef.set(e);
}
}
}
class T extends Thread {
private final Allocate allocate;
private final boolean isType1;
T(Allocate allocate, boolean isType1) {
this.allocate = allocate;
this.isType1 = isType1;
}
@Override
public void run() {
ByteBuffer buffer;
if (isType1)
buffer = allocate.mallocType1();
else
buffer = allocate.mallocType2();
if (buffer != null)
buffer.put("TEST".getBytes());
}
}
class Allocate {
private final AtomicInteger counter;
private final AtomicReference<Throwable> errorRef;
ReentrantLock lock = new ReentrantLock();
int CAPACITY;
Allocate(AtomicInteger counter, AtomicReference<Throwable> errorRef, int allocationMB) {
this.counter = counter;
this.errorRef = errorRef;
this.CAPACITY = allocationMB;
}
ByteBuffer mallocType1() {
lock.lock();
try {
return ByteBuffer.allocate(CAPACITY);
} catch (OutOfMemoryError e) {
errorRef.set(e);
return null;
} finally {
lock.unlock();
counter.decrementAndGet();
}
}
ByteBuffer mallocType2() {
lock.lock();
try {
lock.unlock();
return ByteBuffer.allocate(CAPACITY);
} catch (OutOfMemoryError e) {
errorRef.set(e);
return null;
} finally {
counter.decrementAndGet();
}
}
}
package fs.lock;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockCondition {
public static void main(String... args) {
Share share = new Share();
new AwaitTestThread("A", share, "").start();
new AwaitTestThread("B", share, "\t\t\t\t").start();
// signal을 호출 하지 않으면 await에 지정된 시간이 지난 후
// 쓰레드가 await 이후 로직을 수행하지만, 블록된 쓰레드 순서대로 실행되지 않는다
// timer.schedule(..)을 제거해 보면 확인 할 수 있다.
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
share.signalTest();
}
}, 100);
}
}
class AwaitTestThread extends Thread {
private final Share share;
private final String name;
private final String prefix;
public AwaitTestThread(String name, Share share, String prefix) {
super(name);
this.name = name;
this.share = share;
this.prefix = prefix;
}
@Override
public void run() {
while (true) {
this.share.awaitTest(this.name, this.prefix);
}
}
}
class Share {
ReentrantLock lock = new ReentrantLock();
Deque<Condition> waiters = new ArrayDeque<>();
AtomicInteger counter = new AtomicInteger(0);
public void awaitTest(String name, String prefix) {
lock.lock();
try {
int i = counter.incrementAndGet();
Condition condition = lock.newCondition();
waiters.addLast(condition);
System.out.println(prefix + name + i);
try {
condition.await(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
System.out.println("###" + e.getMessage());
}
System.out.println(prefix + "\t" + name + i);
Condition condition1 = waiters.removeFirst();
if (condition != condition1) {
throw new IllegalStateException(Thread.currentThread().getName() + ": Wooops");
}
} finally {
lock.unlock();
}
}
public void signalTest() {
lock.lock();
try {
Condition condition = waiters.peekFirst();
if (condition != null) {
condition.signal();
}
} finally {
lock.unlock();
}
}
}
package fs.lock;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
public class ReentrantLockConditionBasic {
public static void main(String... args) {
int iterCount = 20;
ShareBasic share = new ShareBasic(iterCount);
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit((Runnable) () ->
IntStream.range(0, iterCount).forEach(i -> share.doUp())
);
executorService.submit((Runnable) () -> {
try {
while (true) {
share.doDown();
}
} catch (InterruptedException e) {
executorService.shutdown();
}
});
}
}
class ShareBasic {
int iterCount;
Lock lock = new ReentrantLock();
Condition condition;
int interCount;
// volatile: https://twitter.com/_freestrings/status/688963217784082432
volatile boolean isUp;
public ShareBasic(int iterCount) {
this.iterCount = iterCount;
condition = lock.newCondition();
}
void doUp() {
lock.lock();
try {
while (!isUp) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
isUp = false;
System.out.println("");
condition.signal();
} finally {
lock.unlock();
}
}
void doDown() throws InterruptedException {
if (interCount++ == iterCount) {
throw new InterruptedException();
}
lock.lock();
try {
while (isUp) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
isUp = true;
System.out.print("");
condition.signal();
} finally {
lock.unlock();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment