Skip to content

Instantly share code, notes, and snippets.

@duarten
Created October 25, 2016 19:48
Show Gist options
  • Save duarten/62841b0c32181bfd7ee9b1e19bcd078e to your computer and use it in GitHub Desktop.
Save duarten/62841b0c32181bfd7ee9b1e19bcd078e to your computer and use it in GitHub Desktop.
Flat combining
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public final class Combiner {
public interface Action {
public void apply();
}
private static final class Node {
public Action request;
public AtomicInteger wait = new AtomicInteger();
public boolean complete;
public AtomicReference<Node> next = new AtomicReference<Node>();
}
private ThreadLocal<Node> _myNode = new ThreadLocal<Node>() {
@Override
protected Node initialValue() {
return new Node();
}
};
private final int _limit;
private AtomicReference<Node> _tail;
public Combiner(int limit) {
_limit = limit;
_tail = new AtomicReference<Node>(new Node());
}
public void combine(Action action) {
Node nextNode = _myNode.get();
nextNode.complete = false;
nextNode.wait.set(1);
Node curNode = _tail.getAndSet(nextNode);
_myNode.set(curNode);
//
// There's now a window where nextNode/_tail can't be reached.
// So, any communication has to be done via the previous node
// in the list, curNode.
//
curNode.request = action;
curNode.next.lazySet(nextNode);
// Wait until our request has been fulfilled or we are the combiner.
final int maxSpins = 256;
int spin = 0;
while (curNode.wait.get() == 1) {
if (++spin == maxSpins) {
Thread.yield();
spin = 0;
}
}
if (curNode.complete)
return;
// We are now the combiner. We copy n's Next field into nn, as n will
// become untouchable after n.wait.lazySet(0), due to reuse.
Node n = curNode;
Node nn;
for (int c = 0; c < _limit && (nn = n.next.get()) != null; ++c, n = nn) {
n.request.apply();
n.next.set(null);
n.request = null;
n.complete = true;
n.wait.lazySet(0);
}
// Make someone else the combiner.
n.wait.set(0);
}
static final int threads = 100;
static final int ops = 1 << 20;
private static void testWithoutCombining() {
final AtomicInteger[] counters = new AtomicInteger[5];
for (int i = 0; i < counters.length; ++i)
counters[i] = new AtomicInteger();
final java.util.concurrent.CountDownLatch l = new java.util.concurrent.CountDownLatch(threads);
for (int i = 0; i < threads; ++i) {
new Thread() {
@Override
public void run() {
for (int j = 0; j < ops; ++j) {
for (AtomicInteger c : counters) {
c.incrementAndGet();
}
}
l.countDown();
}
}.start();
}
do {
try {
l.await();
break;
} catch (InterruptedException e) { }
} while (true);
for (int i = 0; i < counters.length; ++i)
System.out.println(counters[i].get());
}
private static void testWithCombining() {
final int[] counters = new int[5];
final Combiner c = new Combiner(100);
final java.util.concurrent.CountDownLatch l = new java.util.concurrent.CountDownLatch(threads);
for (int i = 0; i < threads; ++i) {
new Thread() {
@Override
public void run() {
for (int j = 0; j < ops; ++j) {
c.combine(new Combiner.Action() {
@Override
public void apply() {
for (int c = 0; c < counters.length; ++c) {
counters[c] += 1;
}
}});
}
l.countDown();
}
}.start();
}
do {
try {
l.await();
break;
} catch (InterruptedException e) { }
} while (true);
for (int i = 0; i < counters.length; ++i)
System.out.println(counters[i]);
}
public static void main(String[] args) {
long startTime = System.nanoTime();
testWithCombining();
//testWithoutCombining();
long estimatedTime = System.nanoTime() - startTime;
System.out.println("time: " + estimatedTime / 1000000);
}
}
@stefanofago73
Copy link

stefanofago73 commented Jul 12, 2017

i've try to use this code on Java 8, on windows with i7, 8GB RAM.
The original code see flat-combining, two time slower...
So i've modified code with:

  1. to not change to much code...creating a functional interface...
    
    
    @FunctionalInterface
    public interface Action {
     public void apply();  
     }
    
  2. ...on the combine method...locksupport.parknanos

    final int maxSpins = 256;
    int spin = 0;
    while (curNode.wait.get() == 1) {
    	if (++spin == maxSpins) {
    		LockSupport.parkNanos(1L); // ok...it's not the same as yield...
    		spin = 0;
    	}
    }
    
  3. ...in testWithCombining...the use of lambda....

             new Thread() {
    		@Override
    		public void run() {
    			for (int j = 0; j < ops; ++j) {
    				c.combine(() -> {
    					for (int c = 0; c < counters.length; ++c) {
    						counters[c] += 1;
    					}
    				});
    			}
    			l.countDown();
    		}
    	}.start();
    

make it sense?

Now i've:

with combining time: 6166
without combining time: 19000

Thank You for this interesting code!

@franz1981
Copy link

franz1981 commented Sep 7, 2018

I can't judge the fidelity of the implementation but I can give some hints on how to improve it:

  1. LockSupport/Thread.yield are just methadone to alleviate cache coherency traffic cost dependent by the running environment/usage that just hurt latencies making them less predictable: I would suggest to add a customizable WaitStrategy/IdleStrategy there to let a user choose what to do (n threads < n cores with pinned threads == win)
  2. try to pack more data into the datastructure (eg nodes) to avoid pointer chasing that will hurt perf both from GC point of view (infamous GC barriers) and due to the load/store fences put by the JVM around concurrent operations that wouldn't save any chase to load a referenced instance on fields
    2 bis) try to pack fields into the classes while considering false sharing protection (JCtools uses abstract classes to control the space between fields) by using 2 cache lines (ie common modern x86 arch uses it as the prefetching size, impacting on false sharing as well)
  3. please use JMH to avoid the JVM to perform some optimizations (eg dropping code that won't perform any visible effect) that won't happen on real code
  4. threads on test should be < n cores unless you want to measure the cost of context switching or bench the OS scheduler :P
    I hope my 2 cents are welcome and compliment for the good work!!It is very interesting!!!!

To make my suggestions more clear I have created a branch of dummy project of mine: franz1981/java-puzzles@5eb59bb

In it I have copied the original Combiner implementation (adding a custom IdleStrategy to allow benchmarking it with JMH more easily)
and build up another one (OptimizedCombiner) with more "dense" datastructures, opefully packed to avoid false sharing.
In addition a I have relaxed some of the sequential consistent operations in favour of relaxed ones (without breaking the original intent).

The numbers on my box are (using 4 hammering threads):

Benchmark              (combinerType)  (idleStrategyType)  Mode  Cnt     Score      Error  Units
CombinerBench.combine         vanilla                spin  avgt   10   516.590 ±   13.473  ns/op
CombinerBench.combine         vanilla               yield  avgt   10   470.046 ±   41.168  ns/op
CombinerBench.combine         vanilla                park  avgt   10   517.785 ±   66.786  ns/op
CombinerBench.combine          unsafe                spin  avgt   10   326.038 ±    3.921  ns/op
CombinerBench.combine          unsafe               yield  avgt   10   346.840 ±    2.885  ns/op
CombinerBench.combine          unsafe                park  avgt   10   346.230 ±   14.160  ns/op
CombinerBench.combine            lock                spin  avgt   10  2157.008 ± 3378.290  ns/op
CombinerBench.combine            lock               yield  avgt   10   353.903 ±   20.211  ns/op
CombinerBench.combine            lock                park  avgt   10   315.353 ±   64.988  ns/op

While with just one (to emulate a normal single threaded usage):

Benchmark              (combinerType)  (idleStrategyType)  Mode  Cnt   Score   Error  Units
CombinerBench.combine         vanilla                spin  avgt   10  48.550 ± 0.902  ns/op
CombinerBench.combine         vanilla               yield  avgt   10  49.079 ± 1.496  ns/op
CombinerBench.combine         vanilla                park  avgt   10  48.539 ± 0.768  ns/op
CombinerBench.combine          unsafe                spin  avgt   10  26.858 ± 0.502  ns/op
CombinerBench.combine          unsafe               yield  avgt   10  26.689 ± 0.448  ns/op
CombinerBench.combine          unsafe                park  avgt   10  27.732 ± 1.972  ns/op
CombinerBench.combine            lock                spin  avgt   10  16.880 ± 0.196  ns/op
CombinerBench.combine            lock               yield  avgt   10  17.035 ± 0.860  ns/op
CombinerBench.combine            lock                park  avgt   10  17.245 ± 0.417  ns/op

Some notes:

  • vanilla is the original combiner, while unsafe is the other one
  • the benchmark using a (spin) lock is not present in the original one, but given the intent of a combiner I suppose is a good
    "contender" given that both guarantees exclusive execution of actions

The layout in memory of the optimized version is this one (running ~OptimizedCombiner::main`):

# Running 64-bit HotSpot VM.
# Using compressed oop with 3-bit shift.
# Using compressed klass with 3-bit shift.
# Objects are 8 bytes aligned.
# Field sizes by type: 4, 1, 1, 2, 2, 4, 4, 8, 8 [bytes]
# Array element sizes: 4, 1, 1, 2, 2, 4, 4, 8, 8 [bytes]

red.hat.puzzles.combiner.TailCombiner$Node object internals:
 OFFSET  SIZE                                         TYPE DESCRIPTION                               VALUE
      0    12                                              (object header)                           N/A
     12     4                                              (alignment/padding gap)                  
     16     8                                         long Pad0Node.p00                              N/A
     24     8                                         long Pad0Node.p01                              N/A
     32     8                                         long Pad0Node.p02                              N/A
     40     8                                         long Pad0Node.p03                              N/A
     48     8                                         long Pad0Node.p04                              N/A
     56     8                                         long Pad0Node.p05                              N/A
     64     8                                         long Pad0Node.p06                              N/A
     72     8                                         long Pad0Node.p07                              N/A
     80     8                                         long Pad0Node.p10                              N/A
     88     8                                         long Pad0Node.p11                              N/A
     96     8                                         long Pad0Node.p12                              N/A
    104     8                                         long Pad0Node.p13                              N/A
    112     8                                         long Pad0Node.p14                              N/A
    120     8                                         long Pad0Node.p15                              N/A
    128     8                                         long Pad0Node.p16                              N/A
    136     4                                          int WaitFieldNode.wait                        N/A
    140     4                                              (alignment/padding gap)                  
    144     8                                         long Pad1Node.p00                              N/A
    152     8                                         long Pad1Node.p01                              N/A
    160     8                                         long Pad1Node.p02                              N/A
    168     8                                         long Pad1Node.p03                              N/A
    176     8                                         long Pad1Node.p04                              N/A
    184     8                                         long Pad1Node.p05                              N/A
    192     8                                         long Pad1Node.p06                              N/A
    200     8                                         long Pad1Node.p10                              N/A
    208     8                                         long Pad1Node.p11                              N/A
    216     8                                         long Pad1Node.p12                              N/A
    224     8                                         long Pad1Node.p13                              N/A
    232     8                                         long Pad1Node.p14                              N/A
    240     8                                         long Pad1Node.p15                              N/A
    248     8                                         long Pad1Node.p16                              N/A
    256     8                                         long Pad1Node.p17                              N/A
    264     4                           java.lang.Runnable Node.request                              N/A
    268     4   red.hat.puzzles.combiner.TailCombiner.Node Node.next                                 N/A
Instance size: 272 bytes
Space losses: 8 bytes internal + 0 bytes external = 8 bytes total

red.hat.puzzles.combiner.OptimizedCombiner object internals:
 OFFSET  SIZE                                         TYPE DESCRIPTION                               VALUE
      0    12                                              (object header)                           N/A
     12     4                                              (alignment/padding gap)                  
     16     8                                         long Pad0Combiner.p00                          N/A
     24     8                                         long Pad0Combiner.p01                          N/A
     32     8                                         long Pad0Combiner.p02                          N/A
     40     8                                         long Pad0Combiner.p03                          N/A
     48     8                                         long Pad0Combiner.p04                          N/A
     56     8                                         long Pad0Combiner.p05                          N/A
     64     8                                         long Pad0Combiner.p06                          N/A
     72     8                                         long Pad0Combiner.p07                          N/A
     80     8                                         long Pad0Combiner.p10                          N/A
     88     8                                         long Pad0Combiner.p11                          N/A
     96     8                                         long Pad0Combiner.p12                          N/A
    104     8                                         long Pad0Combiner.p13                          N/A
    112     8                                         long Pad0Combiner.p14                          N/A
    120     8                                         long Pad0Combiner.p15                          N/A
    128     8                                         long Pad0Combiner.p16                          N/A
    136     4   red.hat.puzzles.combiner.TailCombiner.Node TailCombiner._tail                        N/A
    140     4                                              (alignment/padding gap)                  
    144     8                                         long Pad1Combiner.p00                          N/A
    152     8                                         long Pad1Combiner.p01                          N/A
    160     8                                         long Pad1Combiner.p02                          N/A
    168     8                                         long Pad1Combiner.p03                          N/A
    176     8                                         long Pad1Combiner.p04                          N/A
    184     8                                         long Pad1Combiner.p05                          N/A
    192     8                                         long Pad1Combiner.p06                          N/A
    200     8                                         long Pad1Combiner.p10                          N/A
    208     8                                         long Pad1Combiner.p11                          N/A
    216     8                                         long Pad1Combiner.p12                          N/A
    224     8                                         long Pad1Combiner.p13                          N/A
    232     8                                         long Pad1Combiner.p14                          N/A
    240     8                                         long Pad1Combiner.p15                          N/A
    248     8                                         long Pad1Combiner.p16                          N/A
    256     8                                         long Pad1Combiner.p17                          N/A
    264     4                                          int OptimizedCombiner._limit                  N/A
    268     4                        java.lang.ThreadLocal OptimizedCombiner._myNode                 N/A
Instance size: 272 bytes
Space losses: 8 bytes internal + 0 bytes external = 8 bytes total

@stefanofago73
I suppose that access to a shared resource would be a nice use case for a Combiner

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment