Skip to content

Instantly share code, notes, and snippets.

@dev-jonghoonpark
Last active August 14, 2023 11:32
Show Gist options
  • Save dev-jonghoonpark/0e4e891fb1ca045d9f63efd86461f199 to your computer and use it in GitHub Desktop.
Save dev-jonghoonpark/0e4e891fb1ca045d9f63efd86461f199 to your computer and use it in GitHub Desktop.
public class AccountManager {
private ConcurrentHashMap<Integer, Account> accounts = new ConcurrentHashMap<>();
private volatile boolean shutdown = false;
private BlockingQueue<TransferTask> pending = new LinkedBlockingQueue<>();
private BlockingQueue<TransferTask> forDeposit = new LinkedBlockingQueue<>();
private BlockingQueue<TransferTask> failed = new LinkedBlockingQueue<>();
private Thread withdrawals;
private Thread deposits;
// ...
}
public Account createAccount(int balance) {
var out = new Account(balance);
accounts.put(out.getAccountId(), out);
return out;
}
public void submit(TransferTask transfer) {
if (shutdown) {
return false;
}
return pending.add(transfer);
}
Runnable deposit = () -> {
boolean interrupted = false;
while (!interrupted || !forDeposit.isEmpty()) {
try {
var task = forDeposit.take();
var receiver = task.receiver();
receiver.deposit(task.amount());
} catch (InterruptedException e) {
interrupted = true;
}
}
};
var manager = new AccountManager();
manager.init();
var acc1 = manager.createAccount(1000);
var acc2 = manager.createAccount(20_000);
var transfer = new TransferTask(acc1, acc2, 100);
manager.submit(transfer); // Submits the transfer from acc1 to acc2
Thread.sleep(5000); // Sleeps to allow time for the transfer to execute
System.out.println(acc1);
System.out.println(acc2);
manager.shutdown();
manager.await();
Runnable withdraw = () -> {
boolean interrupted = false;
while (!interrupted || !pending.isEmpty()) {
try {
var task = pending.take();
var sender = task.sender();
if (sender.withdraw(task.amount())) {
forDeposit.add(task);
} else {
failed.add(task);
}
} catch (InterruptedException e) {
interrupted = true;
}
}
deposits.interrupt();
};
Runnable withdraw = () -> {
LOOP:
while (!shutdown) {
try {
var task = pending.poll(5, TimeUnit.SECONDS);
if (task == null) {
continue LOOP;
}
var sender = task.sender();
if (sender.withdraw(task.amount())) {
forDeposit.put(task);
} else {
failed.put(task);
}
} catch (InterruptedException e) {
// Log at critical and proceed to next item
}
}
// Drain pending queue to failed or log
};
void init() {
// Runnable withdraw = ...
// Runnable deposit = ...
init(withdraw, deposit)
}
void init(Runnable withdraw, Runnable deposit) {
withdrawals = new Thread(withdraw);
deposits = new Thread(deposit);
withdrawals.start();
deposits.start();
}
/**
* The lock protecting all mutators. (We have a mild preference
* for builtin monitors over ReentrantLock when either will do.)
*/
final transient Object lock = new Object();
private transient volatile Object[] array;
public boolean add(E e) {
synchronized (lock) {
Object[] es = getArray();
int len = es.length;
es = Arrays.copyOf(es, len + 1);
es[len] = e;
setArray(es);
return true;
}
}
var ls = new CopyOnWriteArrayList(List.of(1, 2, 3));
var it = ls.iterator();
ls.add(4);
var modifiedIt = ls.iterator();
while (it.hasNext()) {
System.out.println("Original: "+ it.next());
}
while (modifiedIt.hasNext()) {
System.out.println("Modified: "+ modifiedIt.next());
}
Original: 1
Original: 2
Original: 3
Modified: 1
Modified: 2
Modified: 3
Modified: 4
Future<Long> fut = getNthPrime(1_000_000_000);
try {
long result = fut.get(1, TimeUnit.MINUTES);
System.out.println("Found it: " + result);
} catch (TimeoutException tox) {
// Timed out - better cancel the task
System.err.println("Task timed out, cancelling");
fut.cancel(true);
} catch (InterruptedException e) {
fut.cancel(true);
throw e;
} catch (ExecutionException e) {
fut.cancel(true);
e.getCause().printStackTrace();
}
try {
Path file = Paths.get("/Users/karianna/foobar.txt");
var channel = AsynchronousFileChannel.open(file); // Opens the file asynchronously
var buffer = ByteBuffer.allocate(1_000_000);
Future<Integer> result = channel.read(buffer, 0); // Requests a read of up to one million bytes
BusinessProcess.doSomethingElse(); // Does something else
var bytesRead = result.get(); // Gets the result when ready
System.out.println("Bytes read [" + bytesRead + "]");
} catch (IOException | ExecutionException | InterruptedException e) {
e.printStackTrace();
}
public static Future<Long> getNthPrime(int n) {
var numF = new CompletableFuture<Long>(); // Creates the completable Future in an uncompleted state
new Thread(() -> {
long num = NumberService.findPrime(n); // The actual calculation complete the Future of the prime number
numF.complete(num); // Creates and starts a new thread that will The actual calculation complete the Future
}).start();
return numF;
}
public static Future<Long> getNthPrime(int n) {
return CompletableFuture.supplyAsync(() -> NumberService.findPrime(n));
}
public class TransferTask {
private final Account sender;
private final Account receiver;
private final int amount;
public TransferTask(Account sender, Account receiver, int amount) {
this.sender = sender;
this.receiver = receiver;
this.amount = amount;
}
public Account sender() {
return sender;
}
public int amount() {
return amount;
}
public Account receiver() {
return receiver;
}
// Other methods elided
}
public class WorkUnit<T> {
private final T workUnit;
public T getWork() {
return workUnit;
}
public WorkUnit(T workUnit) {
this.workUnit = workUnit;
}
// ... other methods elided
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment