Created
October 25, 2016 19:48
-
-
Save duarten/62841b0c32181bfd7ee9b1e19bcd078e to your computer and use it in GitHub Desktop.
Flat combining
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicReference; | |
public final class Combiner { | |
public interface Action { | |
public void apply(); | |
} | |
private static final class Node { | |
public Action request; | |
public AtomicInteger wait = new AtomicInteger(); | |
public boolean complete; | |
public AtomicReference<Node> next = new AtomicReference<Node>(); | |
} | |
private ThreadLocal<Node> _myNode = new ThreadLocal<Node>() { | |
@Override | |
protected Node initialValue() { | |
return new Node(); | |
} | |
}; | |
private final int _limit; | |
private AtomicReference<Node> _tail; | |
public Combiner(int limit) { | |
_limit = limit; | |
_tail = new AtomicReference<Node>(new Node()); | |
} | |
public void combine(Action action) { | |
Node nextNode = _myNode.get(); | |
nextNode.complete = false; | |
nextNode.wait.set(1); | |
Node curNode = _tail.getAndSet(nextNode); | |
_myNode.set(curNode); | |
// | |
// There's now a window where nextNode/_tail can't be reached. | |
// So, any communication has to be done via the previous node | |
// in the list, curNode. | |
// | |
curNode.request = action; | |
curNode.next.lazySet(nextNode); | |
// Wait until our request has been fulfilled or we are the combiner. | |
final int maxSpins = 256; | |
int spin = 0; | |
while (curNode.wait.get() == 1) { | |
if (++spin == maxSpins) { | |
Thread.yield(); | |
spin = 0; | |
} | |
} | |
if (curNode.complete) | |
return; | |
// We are now the combiner. We copy n's Next field into nn, as n will | |
// become untouchable after n.wait.lazySet(0), due to reuse. | |
Node n = curNode; | |
Node nn; | |
for (int c = 0; c < _limit && (nn = n.next.get()) != null; ++c, n = nn) { | |
n.request.apply(); | |
n.next.set(null); | |
n.request = null; | |
n.complete = true; | |
n.wait.lazySet(0); | |
} | |
// Make someone else the combiner. | |
n.wait.set(0); | |
} | |
static final int threads = 100; | |
static final int ops = 1 << 20; | |
private static void testWithoutCombining() { | |
final AtomicInteger[] counters = new AtomicInteger[5]; | |
for (int i = 0; i < counters.length; ++i) | |
counters[i] = new AtomicInteger(); | |
final java.util.concurrent.CountDownLatch l = new java.util.concurrent.CountDownLatch(threads); | |
for (int i = 0; i < threads; ++i) { | |
new Thread() { | |
@Override | |
public void run() { | |
for (int j = 0; j < ops; ++j) { | |
for (AtomicInteger c : counters) { | |
c.incrementAndGet(); | |
} | |
} | |
l.countDown(); | |
} | |
}.start(); | |
} | |
do { | |
try { | |
l.await(); | |
break; | |
} catch (InterruptedException e) { } | |
} while (true); | |
for (int i = 0; i < counters.length; ++i) | |
System.out.println(counters[i].get()); | |
} | |
private static void testWithCombining() { | |
final int[] counters = new int[5]; | |
final Combiner c = new Combiner(100); | |
final java.util.concurrent.CountDownLatch l = new java.util.concurrent.CountDownLatch(threads); | |
for (int i = 0; i < threads; ++i) { | |
new Thread() { | |
@Override | |
public void run() { | |
for (int j = 0; j < ops; ++j) { | |
c.combine(new Combiner.Action() { | |
@Override | |
public void apply() { | |
for (int c = 0; c < counters.length; ++c) { | |
counters[c] += 1; | |
} | |
}}); | |
} | |
l.countDown(); | |
} | |
}.start(); | |
} | |
do { | |
try { | |
l.await(); | |
break; | |
} catch (InterruptedException e) { } | |
} while (true); | |
for (int i = 0; i < counters.length; ++i) | |
System.out.println(counters[i]); | |
} | |
public static void main(String[] args) { | |
long startTime = System.nanoTime(); | |
testWithCombining(); | |
//testWithoutCombining(); | |
long estimatedTime = System.nanoTime() - startTime; | |
System.out.println("time: " + estimatedTime / 1000000); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I can't judge the fidelity of the implementation but I can give some hints on how to improve it:
2 bis) try to pack fields into the classes while considering false sharing protection (JCtools uses abstract classes to control the space between fields) by using 2 cache lines (ie common modern x86 arch uses it as the prefetching size, impacting on false sharing as well)
I hope my 2 cents are welcome and compliment for the good work!!It is very interesting!!!!
To make my suggestions more clear I have created a branch of dummy project of mine: franz1981/java-puzzles@5eb59bb
In it I have copied the original Combiner implementation (adding a custom IdleStrategy to allow benchmarking it with JMH more easily)
and build up another one (OptimizedCombiner) with more "dense" datastructures, opefully packed to avoid false sharing.
In addition a I have relaxed some of the sequential consistent operations in favour of relaxed ones (without breaking the original intent).
The numbers on my box are (using 4 hammering threads):
While with just one (to emulate a normal single threaded usage):
Some notes:
vanilla
is the original combiner, whileunsafe
is the other one"contender" given that both guarantees exclusive execution of actions
The layout in memory of the optimized version is this one (running ~OptimizedCombiner::main`):
@stefanofago73
I suppose that access to a shared resource would be a nice use case for a
Combiner