Created
June 15, 2018 16:03
-
-
Save Gunisalvo/30bee4786a51c4a4526f9f898ba634c6 to your computer and use it in GitHub Desktop.
Concurrency Patterns
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import java.util.concurrent.*; | |
public class SensibleApproach { | |
private static long SIMULATION_TIME = 5000L; | |
public static void main(String[] args) throws InterruptedException { | |
UnsafeBuffer guinea = new UnsafeBuffer(10); | |
Random generator = new Random(); | |
ExecutorService executor = Executors.newFixedThreadPool(10); | |
pooling(guinea, generator, executor, | |
() -> { | |
System.out.println(Thread.currentThread().getName() + "->" + guinea.consume()); | |
//Unsafe! | |
guinea.clear(); | |
}); | |
guinea.clear(); | |
publisherSubscriber(guinea, generator, executor, () -> { | |
System.out.println(Thread.currentThread().getName() + "->" + guinea.consume()); | |
//guinea.clear(); //Let the 'Observer' handle this. | |
}); | |
guinea.clear(); | |
notifyOne(guinea, generator, executor, () -> { | |
System.out.println(Thread.currentThread().getName() + "->" + guinea.consume()); | |
}); | |
executor.shutdown(); | |
System.out.println("Done without cumbersome Lock handling."); | |
} | |
private static void notifyOne(UnsafeBuffer buffer, | |
Random generator, | |
ExecutorService executor, | |
Runnable consumer) throws InterruptedException { | |
System.out.println(">>> Publisher/Subscriber Method Notify One (predictable)"); | |
Node start = new Node(consumer); | |
start.buildRing(new Node(consumer), new Node(consumer)); | |
Ring observantTracker = new Ring(start); | |
Runnable notifier = () -> { | |
buffer.publish(Arrays.asList(generator.nextInt())); | |
observantTracker.execute(); | |
//Clear after deliveries. | |
buffer.clear(); | |
}; | |
long startTime = System.currentTimeMillis(); | |
while(System.currentTimeMillis() - startTime < SIMULATION_TIME){ | |
Thread.sleep(200L); | |
executor.submit(notifier); | |
} | |
} | |
private static void publisherSubscriber(UnsafeBuffer buffer, | |
Random generator, | |
ExecutorService executor, | |
Runnable consumer) throws InterruptedException { | |
System.out.println(">>> Publisher/Subscriber Method Notify All Subscribers (predictable)"); | |
List<Runnable> subscribers = new ArrayList<>(); | |
//controlled access to data structure | |
subscribers.add(consumer); | |
subscribers.add(consumer); | |
subscribers.add(consumer); | |
Runnable publisher = () -> { | |
buffer.publish(Arrays.asList(generator.nextInt())); | |
subscribers.stream().forEach(r -> r.run()); | |
//Clear after deliveries. | |
buffer.clear(); | |
}; | |
long startTime = System.currentTimeMillis(); | |
while(System.currentTimeMillis() - startTime < SIMULATION_TIME){ | |
Thread.sleep(200L); | |
executor.submit(publisher); | |
} | |
} | |
private static void pooling(UnsafeBuffer buffer, | |
Random generator, | |
ExecutorService executor, | |
Runnable consumer) throws InterruptedException { | |
System.out.println(">>> Pooling Method (unpredictable)"); | |
Runnable producer = () -> buffer.publish(Arrays.asList(generator.nextInt())); | |
long startTime = System.currentTimeMillis(); | |
while(System.currentTimeMillis() - startTime < SIMULATION_TIME){ | |
if(generator.nextBoolean()){ | |
executor.submit(producer); | |
}else{ | |
executor.submit(consumer); | |
} | |
Thread.sleep(200L); | |
} | |
} | |
} | |
class UnsafeBuffer { | |
private final ArrayList<Integer> buffer; | |
private final int maxSize; | |
public UnsafeBuffer(int size) { | |
this.buffer = new ArrayList<>(size); | |
this.maxSize = size; | |
} | |
public List<Integer> consume(){ | |
return this.buffer; | |
} | |
public Boolean publish(List<Integer> data){ | |
if(data.size() > this.maxSize){ | |
return Boolean.FALSE; | |
} | |
if(this.maxSize < this.buffer.size() + data.size()){ | |
int shift = this.buffer.size() - (this.maxSize - data.size()); | |
List<Integer> newBuffer = new ArrayList<>(this.buffer.subList(shift, this.buffer.size())); | |
newBuffer.addAll(data); | |
this.buffer.clear(); | |
this.buffer.addAll(newBuffer); | |
}else{ | |
this.buffer.addAll(data); | |
} | |
return Boolean.TRUE; | |
} | |
public Boolean clear(){ | |
this.buffer.clear(); | |
return Boolean.TRUE; | |
} | |
} | |
class Node { | |
private final Runnable observant; | |
private Optional<Node> next; | |
public Node(Runnable observant){ | |
this.observant = observant; | |
this.next = Optional.empty(); | |
} | |
public void setNext(Node next){ | |
this.next = Optional.of(next); | |
} | |
public void buildRing(Node... next) { | |
List<Node> nodes = Arrays.asList(next); | |
Node current = this; | |
Collections.reverse(nodes); | |
for(Node n : nodes){ | |
n.setNext(current); | |
current = n; | |
} | |
} | |
public Optional<Node> executeAnd(){ | |
this.observant.run(); | |
return this.next; | |
} | |
} | |
class Ring { | |
private Node next; | |
public Ring(Node next){ | |
this.next = next; | |
} | |
public void execute(){ | |
next.executeAnd().ifPresent(n -> this.next = n); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment