Last active
January 29, 2017 19:54
-
-
Save James-Matthew-Watson/b724f3b61f5759071b104563d0d8a0e6 to your computer and use it in GitHub Desktop.
Modified Universal Construction
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package universal; | |
public class Mean | |
{ | |
public static void main(String... args) | |
{ | |
Universal<Integer, Average.State, Double> universal = new Universal<Integer, Average.State, Double>(100, new Average()); | |
for (int i = 0; i < THREADS; i++) | |
{ | |
new Thread(new Test(i * SIZE, universal)).start(); | |
} | |
} | |
private static final int THREADS = 10; | |
private static final int SIZE = 10000; | |
static class Test implements Runnable | |
{ | |
final int start; | |
final Universal<Integer, Average.State, Double> universal; | |
Test(int start, Universal<Integer, Average.State, Double> universal) | |
{ | |
this.start = start; | |
this.universal = universal; | |
} | |
@Override | |
public void run() | |
{ | |
for (int i = start; i < start + SIZE; i++) | |
{ | |
System.out.println(Thread.currentThread() + " " + i + ": " + universal.apply(i)); | |
} | |
} | |
} | |
static class Average implements Sequential<Integer, Average.State, Double> | |
{ | |
static class State | |
{ | |
final int sum; | |
final int count; | |
private State(int sum, int count) | |
{ | |
this.sum = sum; | |
this.count = count; | |
} | |
State add(int value) | |
{ | |
return new State(sum + value, count + 1); | |
} | |
@Override | |
public String toString() | |
{ | |
return sum + " / " + count; | |
} | |
} | |
@Override | |
public State apply(State prior, Integer invocation) | |
{ | |
return prior.add(invocation); | |
} | |
@Override | |
public Double value(State state) | |
{ | |
return ((double)state.sum)/ state.count; | |
} | |
@Override | |
public State initial() | |
{ | |
return new Average.State(0, 0); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package universal; | |
import java.util.concurrent.atomic.AtomicReference; | |
public class Node<E, S> | |
{ | |
public static final <F, T> Node<F, T> tail() | |
{ | |
Node<F, T> node = new Node<F, T>(null) { | |
@Override | |
public String toString() | |
{ | |
return "tail"; | |
} | |
}; | |
node.seq = 0; | |
return node; | |
} | |
private volatile int seq = -1; | |
private final E invocation; | |
private final AtomicReference<S> state = new AtomicReference<>(); | |
private final AtomicReference<Node<E, S>> next = new AtomicReference<>(); | |
private final AtomicReference<Node<E, S>> previous = new AtomicReference<>(); | |
public Node(E invocation) | |
{ | |
this.invocation = invocation; | |
} | |
public Node<E, S> decideNext(Node<E, S> candidate) | |
{ | |
if (next.compareAndSet(null, candidate)) { | |
candidate.seq = this.seq + 1; | |
candidate.previous.set(this); | |
} | |
return next(); | |
} | |
public E getInvocation() | |
{ | |
return invocation; | |
} | |
public void setState(S state) | |
{ | |
this.state.compareAndSet(null, state); | |
} | |
public S getState() | |
{ | |
return state.get(); | |
} | |
public Node<E, S> previous() | |
{ | |
return previous.get(); | |
} | |
public Node<E, S> next() | |
{ | |
return next.get(); | |
} | |
public int sequence() | |
{ | |
return seq; | |
} | |
public boolean isSequenced() | |
{ | |
return seq >= 0; | |
} | |
public void truncate() | |
{ | |
previous.set(null); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package universal; | |
/* | |
* E - input type | |
* P - prior state type | |
* R - result type | |
*/ | |
public interface Sequential<E, S, R> | |
{ | |
S initial(); | |
S apply(S priorState, E value); | |
R value(S state); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package universal; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicReferenceArray; | |
public class Universal<E, S, R> | |
{ | |
private final AtomicInteger nextIndex = new AtomicInteger(0); | |
private final ThreadLocal<Integer> index = new ThreadLocal<Integer>() { | |
protected Integer initialValue() {return nextIndex.getAndIncrement();}; | |
}; | |
private final ThreadLocal<Integer> lastSequence = new ThreadLocal<Integer>() { | |
protected Integer initialValue() {return 0;}; | |
}; | |
private final Sequential<E, S, R> sequential; | |
private final AtomicReferenceArray<Node<E, S>> head; | |
private final AtomicReferenceArray<Node<E, S>> announce; | |
Universal(int n, Sequential<E, S, R> sequential) | |
{ | |
this.sequential = sequential; | |
this.head = new AtomicReferenceArray<>(n); | |
this.announce = new AtomicReferenceArray<>(n); | |
final Node<E, S> tail = Node.tail(); | |
tail.setState(sequential.initial()); | |
for (int i = 0; i < n; i++) { | |
head.set(i, tail); | |
announce.set(i, tail); | |
} | |
} | |
public R apply(E invocation) | |
{ | |
final Node<E, S> mine = new Node<>(invocation); | |
final int threadIndex = index.get(); | |
announce.set(threadIndex, mine); | |
Node<E, S> prefer = null; | |
while (!mine.isSequenced()) { | |
if (prefer == null) { | |
final Node<E, S> help = announce.get(lastSequence.get() % announce.length()); | |
prefer = help == null || help.isSequenced() ? mine : help; | |
} | |
while (!prefer.isSequenced()) { | |
head.set(threadIndex, max().decideNext(prefer)); | |
} | |
prefer = mine; | |
} | |
announce.set(threadIndex, null); | |
lastSequence.set(mine.sequence()); | |
return evaluate(mine); | |
} | |
private Node<E, S> max() { | |
Node<E, S> max = head.get(0); | |
for (int i = 0; i < head.length(); i++) { | |
if (max.sequence() < head.get(i).sequence()) max = head.get(i); | |
} | |
return max; | |
} | |
/** ensure all head references are moving forward to k */ | |
private void moveForward(Node<E, S> to) | |
{ | |
for (int i = 0; i < head.length(); i++) { | |
Node<E, S> node = head.get(i); | |
if (to.sequence() > node.sequence()) head.compareAndSet(i, node, to); | |
} | |
} | |
private void ensurePrior(Node<E, S> node) | |
{ | |
final Node<E,S> previous = node.previous(); | |
if (previous != null && previous.getState() == null) { | |
evaluate(previous); | |
} | |
} | |
private R evaluate(final Node<E, S> node) | |
{ | |
ensurePrior(node); | |
final Node<E, S> last = node.previous(); | |
if (last != null) { | |
node.setState(sequential.apply(last.getState(), node.getInvocation())); | |
node.truncate(); | |
} | |
moveForward(node); | |
return sequential.value(node.getState()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment