Skip to content

Instantly share code, notes, and snippets.

@Gunisalvo
Created June 15, 2018 16:03
Show Gist options
  • Save Gunisalvo/30bee4786a51c4a4526f9f898ba634c6 to your computer and use it in GitHub Desktop.
Save Gunisalvo/30bee4786a51c4a4526f9f898ba634c6 to your computer and use it in GitHub Desktop.
Concurrency Patterns
import java.util.*;
import java.util.concurrent.*;
public class SensibleApproach {
private static long SIMULATION_TIME = 5000L;
public static void main(String[] args) throws InterruptedException {
UnsafeBuffer guinea = new UnsafeBuffer(10);
Random generator = new Random();
ExecutorService executor = Executors.newFixedThreadPool(10);
pooling(guinea, generator, executor,
() -> {
System.out.println(Thread.currentThread().getName() + "->" + guinea.consume());
//Unsafe!
guinea.clear();
});
guinea.clear();
publisherSubscriber(guinea, generator, executor, () -> {
System.out.println(Thread.currentThread().getName() + "->" + guinea.consume());
//guinea.clear(); //Let the 'Observer' handle this.
});
guinea.clear();
notifyOne(guinea, generator, executor, () -> {
System.out.println(Thread.currentThread().getName() + "->" + guinea.consume());
});
executor.shutdown();
System.out.println("Done without cumbersome Lock handling.");
}
private static void notifyOne(UnsafeBuffer buffer,
Random generator,
ExecutorService executor,
Runnable consumer) throws InterruptedException {
System.out.println(">>> Publisher/Subscriber Method Notify One (predictable)");
Node start = new Node(consumer);
start.buildRing(new Node(consumer), new Node(consumer));
Ring observantTracker = new Ring(start);
Runnable notifier = () -> {
buffer.publish(Arrays.asList(generator.nextInt()));
observantTracker.execute();
//Clear after deliveries.
buffer.clear();
};
long startTime = System.currentTimeMillis();
while(System.currentTimeMillis() - startTime < SIMULATION_TIME){
Thread.sleep(200L);
executor.submit(notifier);
}
}
private static void publisherSubscriber(UnsafeBuffer buffer,
Random generator,
ExecutorService executor,
Runnable consumer) throws InterruptedException {
System.out.println(">>> Publisher/Subscriber Method Notify All Subscribers (predictable)");
List<Runnable> subscribers = new ArrayList<>();
//controlled access to data structure
subscribers.add(consumer);
subscribers.add(consumer);
subscribers.add(consumer);
Runnable publisher = () -> {
buffer.publish(Arrays.asList(generator.nextInt()));
subscribers.stream().forEach(r -> r.run());
//Clear after deliveries.
buffer.clear();
};
long startTime = System.currentTimeMillis();
while(System.currentTimeMillis() - startTime < SIMULATION_TIME){
Thread.sleep(200L);
executor.submit(publisher);
}
}
private static void pooling(UnsafeBuffer buffer,
Random generator,
ExecutorService executor,
Runnable consumer) throws InterruptedException {
System.out.println(">>> Pooling Method (unpredictable)");
Runnable producer = () -> buffer.publish(Arrays.asList(generator.nextInt()));
long startTime = System.currentTimeMillis();
while(System.currentTimeMillis() - startTime < SIMULATION_TIME){
if(generator.nextBoolean()){
executor.submit(producer);
}else{
executor.submit(consumer);
}
Thread.sleep(200L);
}
}
}
class UnsafeBuffer {
private final ArrayList<Integer> buffer;
private final int maxSize;
public UnsafeBuffer(int size) {
this.buffer = new ArrayList<>(size);
this.maxSize = size;
}
public List<Integer> consume(){
return this.buffer;
}
public Boolean publish(List<Integer> data){
if(data.size() > this.maxSize){
return Boolean.FALSE;
}
if(this.maxSize < this.buffer.size() + data.size()){
int shift = this.buffer.size() - (this.maxSize - data.size());
List<Integer> newBuffer = new ArrayList<>(this.buffer.subList(shift, this.buffer.size()));
newBuffer.addAll(data);
this.buffer.clear();
this.buffer.addAll(newBuffer);
}else{
this.buffer.addAll(data);
}
return Boolean.TRUE;
}
public Boolean clear(){
this.buffer.clear();
return Boolean.TRUE;
}
}
class Node {
private final Runnable observant;
private Optional<Node> next;
public Node(Runnable observant){
this.observant = observant;
this.next = Optional.empty();
}
public void setNext(Node next){
this.next = Optional.of(next);
}
public void buildRing(Node... next) {
List<Node> nodes = Arrays.asList(next);
Node current = this;
Collections.reverse(nodes);
for(Node n : nodes){
n.setNext(current);
current = n;
}
}
public Optional<Node> executeAnd(){
this.observant.run();
return this.next;
}
}
class Ring {
private Node next;
public Ring(Node next){
this.next = next;
}
public void execute(){
next.executeAnd().ifPresent(n -> this.next = n);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment