Skip to content

Instantly share code, notes, and snippets.

@anandrajneesh
Created August 25, 2018 08:42
Show Gist options
  • Save anandrajneesh/a90815e43ed8ac7ed80b3602076fb073 to your computer and use it in GitHub Desktop.
Save anandrajneesh/a90815e43ed8ac7ed80b3602076fb073 to your computer and use it in GitHub Desktop.
barriers
package multithreading.horsetourney;
import java.util.concurrent.BrokenBarrierException;
public abstract class Barrier {
protected final int count;
public Barrier(int participantsCount) {
this.count = participantsCount;
}
public abstract void awaitGunShot() throws InterruptedException, BrokenBarrierException;
public abstract void crossedFinishLine(Horse horse) throws BrokenBarrierException, InterruptedException;
public abstract void stats() throws InterruptedException;
}
package multithreading.horsetourney;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchBarrier extends Barrier {
private final CountDownLatch gunShotLatch;
private final ConcurrentMap<Horse, Long> stats;
private final CountDownLatch finishedLatch;
private final CountDownLatch allReadyLatch;
private long startTime;
public CountDownLatchBarrier(int participantsCount) {
super(participantsCount);
gunShotLatch = new CountDownLatch(1);
stats = new ConcurrentHashMap<>();
finishedLatch = new CountDownLatch(participantsCount);
allReadyLatch = new CountDownLatch(participantsCount);
}
@Override
public void awaitGunShot() throws InterruptedException {
allReadyLatch.countDown();
gunShotLatch.await();
}
@Override
public void crossedFinishLine(Horse horse) {
long now = System.currentTimeMillis();
stats.put(horse, now - startTime);
finishedLatch.countDown();
}
@Override
public void stats() throws InterruptedException {
allReadyLatch.await();
startTime = System.currentTimeMillis();
gunShotLatch.countDown();
finishedLatch.await();
System.out.println("Results : ");
stats.entrySet()
.stream()
.sorted(Comparator.comparingLong(Map.Entry::getValue))
.forEach(System.out::println);
}
}
package multithreading.horsetourney;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierBarrier extends Barrier {
private final ConcurrentMap<Horse, Long> stats;
private final CyclicBarrier gunShot;
private final CyclicBarrier crossedFinishLine;
private long startTime;
public CyclicBarrierBarrier(int participantsCount) {
super(participantsCount);
stats = new ConcurrentHashMap<>();
gunShot = new CyclicBarrier(participantsCount, () -> startTime = System.currentTimeMillis());
crossedFinishLine = new CyclicBarrier(participantsCount, () ->{
});
}
@Override
public void awaitGunShot() throws InterruptedException, BrokenBarrierException {
gunShot.await();
}
@Override
public void crossedFinishLine(Horse horse) throws BrokenBarrierException, InterruptedException {
long now = System.currentTimeMillis();
stats.put(horse, now - startTime);
crossedFinishLine.await();
}
@Override
public void stats() throws InterruptedException {
//crossedFinishLine.
System.out.println("Results : ");
stats.entrySet()
.stream()
.sorted(Comparator.comparingLong(Map.Entry::getValue))
.forEach(System.out::println);
}
}
package multithreading.horsetourney;
import java.util.Arrays;
import java.util.Scanner;
public class Driver {
private enum Command{
NEW,
START,
REGISTER,
DEREGISTER,
QUIT,
UNKNOWN;
public static Command getCommandFromInput(String input){
for(Command value : values()){
if(value.name().equalsIgnoreCase(input)){
return value;
}
}
return UNKNOWN;
}
}
public static void main(String[] args) {
new Driver().start();
}
private void start() {
try(Scanner scanner = new Scanner(System.in)){
Tourney tourney = null;
mainLoop : while(true){
System.out.println("Enter one of the following commands "+ Arrays.toString(Command.values()));
Command command = Command.getCommandFromInput(scanner.nextLine());
switch (command){
case NEW:
tourney = createNewTourney(scanner);
break;
case START:
startRacing(tourney);
break;
case REGISTER:
register(scanner, tourney);
break;
case DEREGISTER:
deregister(scanner, tourney);
break;
case QUIT:
tourney = null;
break mainLoop;
default:
break;
}
}
}catch (Exception e){
e.printStackTrace();
}
}
private void startRacing(Tourney tourney) throws InterruptedException {
tourney.start();
}
private void deregister(Scanner scanner, Tourney tourney) {
System.out.println("Enter participants to be removed from race " + tourney.getRacers());
String input = scanner.nextLine();
String[] participants = input.split(",");
Arrays.stream(participants)
.forEach(tourney::deregister);
}
private void register(Scanner scanner, Tourney tourney) {
System.out.println("Enter participants for race slots remaining are : "+ tourney.getAvailableSlots());
String input = scanner.nextLine();
String[] participants = input.split(",");
Arrays.stream(participants)
.forEach(tourney::add);
}
private Tourney createNewTourney(Scanner scanner) {
System.out.println("Enter no of participants wanted ");
int participantsCount = Integer.parseInt(scanner.nextLine());
return new Tourney(participantsCount);
}
}
package multithreading.horsetourney;
import java.util.Objects;
public class Horse implements Runnable {
private final Barrier barrier;
private final String name;
private final static int RUNNING_DISTANCE = 100 ;
private final static int INTERVAL = 5;
private final static int ONE_TIME_SPUR = 20;
public Horse(Barrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
public String getName() {
return name;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Horse horse = (Horse) o;
return Objects.equals(name, horse.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
}
@Override
public String toString() {
return name;
}
@Override
public void run() {
try {
print("Waiting for gunshot");
barrier.awaitGunShot();
print("Started Running");
int totalDistance = 0;
while (totalDistance < RUNNING_DISTANCE) {
int advance = RandomUtil.random(ONE_TIME_SPUR);
print("Advancing "+advance+ "m");
totalDistance+=advance;
Thread.sleep(INTERVAL * 1000);
}
print("Crossed finish line");
barrier.crossedFinishLine(this);
} catch (Exception e) {
e.printStackTrace();
}
}
private void print(String msg){
System.out.println("["+Thread.currentThread().getName()+"] "+" ["+this.name+"] "+msg);
}
}
package multithreading.horsetourney;
import java.util.Random;
public class RandomUtil {
private final static Random random = new Random();
public static int random(int upperBoundExclusive){
return random.nextInt(upperBoundExclusive);
}
}
package multithreading.horsetourney;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class Tourney {
private final int count;
private Set<Horse> racers;
private final Barrier barrier;
private final ExecutorService exe;
public Tourney(int participantsCount) {
this.count = participantsCount;
this.racers = new HashSet<>();
this.barrier = new CountDownLatchBarrier(participantsCount);
this.exe = Executors.newFixedThreadPool(count);
}
public int getAvailableSlots() {
return count - racers.size();
}
public void add(String racer) {
if(getAvailableSlots() > 0) {
racers.add(new Horse(barrier, racer));
}
}
public Collection<Horse> getRacers() {
return Collections.unmodifiableCollection(racers);
}
public void deregister(String racer) {
racers = racers.stream()
.filter(horse -> !horse.getName().equalsIgnoreCase(racer))
.collect(Collectors.toSet());
}
public void start() throws InterruptedException {
System.out.println("Starting tourney");
racers.forEach(exe::submit);
barrier.stats();
exe.shutdownNow();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment