Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Modified Universal Construction
package universal;
public class Mean
{
public static void main(String... args)
{
Universal<Integer, Average.State, Double> universal = new Universal<Integer, Average.State, Double>(100, new Average());
for (int i = 0; i < THREADS; i++)
{
new Thread(new Test(i * SIZE, universal)).start();
}
}
private static final int THREADS = 10;
private static final int SIZE = 10000;
static class Test implements Runnable
{
final int start;
final Universal<Integer, Average.State, Double> universal;
Test(int start, Universal<Integer, Average.State, Double> universal)
{
this.start = start;
this.universal = universal;
}
@Override
public void run()
{
for (int i = start; i < start + SIZE; i++)
{
System.out.println(Thread.currentThread() + " " + i + ": " + universal.apply(i));
}
}
}
static class Average implements Sequential<Integer, Average.State, Double>
{
static class State
{
final int sum;
final int count;
private State(int sum, int count)
{
this.sum = sum;
this.count = count;
}
State add(int value)
{
return new State(sum + value, count + 1);
}
@Override
public String toString()
{
return sum + " / " + count;
}
}
@Override
public State apply(State prior, Integer invocation)
{
return prior.add(invocation);
}
@Override
public Double value(State state)
{
return ((double)state.sum)/ state.count;
}
@Override
public State initial()
{
return new Average.State(0, 0);
}
}
}
package universal;
import java.util.concurrent.atomic.AtomicReference;
public class Node<E, S>
{
public static final <F, T> Node<F, T> tail()
{
Node<F, T> node = new Node<F, T>(null) {
@Override
public String toString()
{
return "tail";
}
};
node.seq = 0;
return node;
}
private volatile int seq = -1;
private final E invocation;
private final AtomicReference<S> state = new AtomicReference<>();
private final AtomicReference<Node<E, S>> next = new AtomicReference<>();
private final AtomicReference<Node<E, S>> previous = new AtomicReference<>();
public Node(E invocation)
{
this.invocation = invocation;
}
public Node<E, S> decideNext(Node<E, S> candidate)
{
if (next.compareAndSet(null, candidate)) {
candidate.seq = this.seq + 1;
candidate.previous.set(this);
}
return next();
}
public E getInvocation()
{
return invocation;
}
public void setState(S state)
{
this.state.compareAndSet(null, state);
}
public S getState()
{
return state.get();
}
public Node<E, S> previous()
{
return previous.get();
}
public Node<E, S> next()
{
return next.get();
}
public int sequence()
{
return seq;
}
public boolean isSequenced()
{
return seq >= 0;
}
public void truncate()
{
previous.set(null);
}
}
package universal;
/*
* E - input type
* P - prior state type
* R - result type
*/
public interface Sequential<E, S, R>
{
S initial();
S apply(S priorState, E value);
R value(S state);
}
package universal;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
public class Universal<E, S, R>
{
private final AtomicInteger nextIndex = new AtomicInteger(0);
private final ThreadLocal<Integer> index = new ThreadLocal<Integer>() {
protected Integer initialValue() {return nextIndex.getAndIncrement();};
};
private final ThreadLocal<Integer> lastSequence = new ThreadLocal<Integer>() {
protected Integer initialValue() {return 0;};
};
private final Sequential<E, S, R> sequential;
private final AtomicReferenceArray<Node<E, S>> head;
private final AtomicReferenceArray<Node<E, S>> announce;
Universal(int n, Sequential<E, S, R> sequential)
{
this.sequential = sequential;
this.head = new AtomicReferenceArray<>(n);
this.announce = new AtomicReferenceArray<>(n);
final Node<E, S> tail = Node.tail();
tail.setState(sequential.initial());
for (int i = 0; i < n; i++) {
head.set(i, tail);
announce.set(i, tail);
}
}
public R apply(E invocation)
{
final Node<E, S> mine = new Node<>(invocation);
final int threadIndex = index.get();
announce.set(threadIndex, mine);
Node<E, S> prefer = null;
while (!mine.isSequenced()) {
if (prefer == null) {
final Node<E, S> help = announce.get(lastSequence.get() % announce.length());
prefer = help == null || help.isSequenced() ? mine : help;
}
while (!prefer.isSequenced()) {
head.set(threadIndex, max().decideNext(prefer));
}
prefer = mine;
}
announce.set(threadIndex, null);
lastSequence.set(mine.sequence());
return evaluate(mine);
}
private Node<E, S> max() {
Node<E, S> max = head.get(0);
for (int i = 0; i < head.length(); i++) {
if (max.sequence() < head.get(i).sequence()) max = head.get(i);
}
return max;
}
/** ensure all head references are moving forward to k */
private void moveForward(Node<E, S> to)
{
for (int i = 0; i < head.length(); i++) {
Node<E, S> node = head.get(i);
if (to.sequence() > node.sequence()) head.compareAndSet(i, node, to);
}
}
private void ensurePrior(Node<E, S> node)
{
final Node<E,S> previous = node.previous();
if (previous != null && previous.getState() == null) {
evaluate(previous);
}
}
private R evaluate(final Node<E, S> node)
{
ensurePrior(node);
final Node<E, S> last = node.previous();
if (last != null) {
node.setState(sequential.apply(last.getState(), node.getInvocation()));
node.truncate();
}
moveForward(node);
return sequential.value(node.getState());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.