Skip to content

Instantly share code, notes, and snippets.

@nhachicha
Created October 29, 2015 16:32
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save nhachicha/7596e912c81aae38d721 to your computer and use it in GitHub Desktop.
Save nhachicha/7596e912c81aae38d721 to your computer and use it in GitHub Desktop.
Examples using synchronizers (CountDownLatch, CyclicBarrier, Phaser)
// ********************************************************************* //
// ************************** CountDownLatch ************************** //
// ********************************************************************* //
public class RandomIntAverage {
CountDownLatch controller = new CountDownLatch(NB_THREADS);
public void randomIntAvg() throws InterruptedException {
for (int i = 0; i < NB_THREADS; i++) {
new Thread(new Task()).start();
}
controller.await(); // wait until all participating threads finishes
computeAverage();
}
class Task implements Runnable {
@Override
public void run() {
queue.add(random.nextInt());
controller.countDown();
}
}
private final static int NB_THREADS = 3;
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
Random random = new Random(System.currentTimeMillis());
void computeAverage () {
double sum = 0;
for (Integer random : queue) {
sum += random;
}
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size());
}
public static void main(String[] args) throws InterruptedException {
RandomIntAverage main = new RandomIntAverage();
main.randomIntAvg();
}
}
// ********************************************************************* //
// ************************** CountDownLatch ************************** //
// ********************************************************************* //
// basic UC
public class RandomIntAverage {
CyclicBarrier controller = new CyclicBarrier(NB_THREADS + 1); // account for main thread
public void randomIntAvg() throws InterruptedException, BrokenBarrierException {
for (int i = 0; i < NB_THREADS; i++) {
new Thread(new Task()).start();
}
controller.await(); // wait until all participating threads finishes
computeAverage();
}
class Task implements Runnable {
@Override
public void run() {
queue.add(random.nextInt());
try {
controller.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
private final static int NB_THREADS = 3;
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
Random random = new Random(System.currentTimeMillis());
void computeAverage () {
double sum = 0;
for (Integer random : queue) {
sum += random;
}
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size());
}
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
RandomIntAverage main = new RandomIntAverage();
main.randomIntAvg();
}
}
// Reusing the barrier
public class RandomIntAverageReusingBarrier {
CyclicBarrier cyclicBarrier = new CyclicBarrier(NB_THREADS, new Aggregate());
public void randomAvg() {
for (int i = 0; i < NB_THREADS; i++) {
new Thread(new Task()).start();
}
}
class Task implements Runnable {
@Override
public void run() {
try {
queue.add(random.nextInt());
cyclicBarrier.await();
// reusing the barrier
assert queue.size() == 0;
queue.add(random.nextInt());
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
class Aggregate implements Runnable {
@Override
public void run() {
// All threads arrived at barrier
computeAverage();
// clear the queue for reuse
queue.clear();
}
}
private final static int NB_THREADS = 3;
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
Random random = new Random(System.currentTimeMillis());
void computeAverage () {
double sum = 0;
for (Integer random : queue) {
sum += random;
}
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size());
}
public static void main(String[] args) {
RandomIntAverageReusingBarrier main = new RandomIntAverageReusingBarrier();
main.randomAvg();
}
}
// ********************************************************************* //
// ************************** Phaser ************************** //
// ********************************************************************* //
//basic UC
public class RandomIntAverage {
Phaser phaser = new Phaser(1);
public void randomIntAvg(int n) {
for (int i = 0; i < n; i++) {
phaser.register();
new Thread(new Task()).start();
}
phaser.arriveAndAwaitAdvance(); // wait until all registered threads arrives
computeAverage();
}
class Task implements Runnable {
@Override
public void run() {
queue.add(random.nextInt());
phaser.arrive();
}
}
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
Random random = new Random(System.currentTimeMillis());
void computeAverage () {
double sum = 0;
for (Integer random : queue) {
sum += random;
}
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size());
}
public static void main(String[] args) {
RandomIntAverage main = new RandomIntAverage();
main.randomIntAvg(3);
}
}
// Resuing the barrier
public class RandomIntAverageMultiplePhases {
Phaser phaser = new Phaser(1) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("Phase #" + phase);
computeAverage();
queue.clear();
// return true if the Phaser should terminate, false if Phaser should continue with next phase
return phase >= NB_PHASES - 1 || registeredParties == 0;
}
};
public void randomIntAvg(int n) {
for (int i = 0; i < n; i++) {
phaser.register();
new Thread(new Task()).start();
}
// main thread finished initialising, unregister to unlock waiting thread
// if we don't unregister the condition will be met once, but we want to reuse the barrier
// and trigger a new phase every time n registered thread arrives & not n+1
phaser.arriveAndDeregister(); // unregister to start reusing the barrier
}
class Task implements Runnable {
@Override
public void run() {
while (!phaser.isTerminated()) {
queue.add(random.nextInt());
phaser.arriveAndAwaitAdvance();
}
}
}
private final static int NB_PHASES = 2;
private final static int NB_THREADS = 3;
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
Random random = new Random(System.currentTimeMillis());
void computeAverage () {
double sum = 0;
for (Integer random : queue) {
sum += random;
}
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size());
}
public static void main(String[] args) {
RandomIntAverageMultiplePhases main = new RandomIntAverageMultiplePhases();
main.randomIntAvg(NB_THREADS);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment