Skip to content

Instantly share code, notes, and snippets.

@joshed-io
Created August 19, 2013 21:41
Show Gist options
  • Save joshed-io/6274529 to your computer and use it in GitHub Desktop.
Save joshed-io/6274529 to your computer and use it in GitHub Desktop.
Simple storm trident DRPC topology that spends too much time waiting for LMAX disruptor slots. Posted for the mailing list.

This trident topology emits the DRPC argument to a splitter, when then emits the argument 1000 times to a family of 100 bolts, which then emit 5000 results each. A final aggregator increments a count for each result. With these numbers, the expected DRPC result is 1000 * 5000 = 5,000,000.

While this program seems like it would be simple and fast, it's not. It spends 75% of its time waiting on the LMAX disruptor. As a result, just emitting tuples becomes the clear bottleneck. Here's a snippet of log file times for trying to emit 5k tuples.

Visual VM Profiler

2013-08-19 14:31:01.056 [Thread-167-b-4] [DEBUG] i.k.s.d.b.PerformanceTestDrpcStream EachFunctionExecute, t=3,082.48
2013-08-19 14:31:01.057 [Thread-163-b-4] [DEBUG] i.k.s.d.b.PerformanceTestDrpcStream EachFunctionExecute, t=2,946.93
2013-08-19 14:31:01.057 [Thread-145-b-4] [DEBUG] i.k.s.d.b.PerformanceTestDrpcStream EachFunctionExecute, t=2,962.79
2013-08-19 14:31:01.057 [Thread-187-b-4] [DEBUG] i.k.s.d.b.PerformanceTestDrpcStream EachFunctionExecute, t=69.27
2013-08-19 14:31:01.059 [Thread-193-b-4] [DEBUG] i.k.s.d.b.PerformanceTestDrpcStream EachFunctionExecute, t=3,392.72
2013-08-19 14:31:01.061 [Thread-223-b-4] [DEBUG] i.k.s.d.b.PerformanceTestDrpcStream EachFunctionExecute, t=3,173.38
2013-08-19 14:31:01.064 [Thread-143-b-4] [DEBUG] i.k.s.d.b.PerformanceTestDrpcStream EachFunctionExecute, t=7.83
2013-08-19 14:31:01.065 [Thread-64-b-4] [DEBUG] i.k.s.d.b.PerformanceTestDrpcStream EachFunctionExecute, t=2,942.67
2013-08-19 14:31:01.066 [Thread-167-b-4] [DEBUG] i.k.s.d.b.PerformanceTestDrpcStream EachFunctionExecute, t=9.24
2013-08-19 14:31:01.066 [Thread-193-b-4] [DEBUG] i.k.s.d.b.PerformanceTestDrpcStream EachFunctionExecute, t=7.25
2013-08-19 14:31:01.068 [Thread-139-b-4] [DEBUG] i.k.s.d.b.PerformanceTestDrpcStream EachFunctionExecute, t=2,951.14
2013-08-19 14:31:01.068 [Thread-223-b-4] [DEBUG] i.k.s.d.b.PerformanceTestDrpcStream EachFunctionExecute, t=7.53

This results in a response time of 25 seconds to process 5m tuples, or 200k tuples / second (in local cluster, 1 worker, 8G heap).

The disruptor waiting gets worse with parallelism - i.e. with 10 EachFunctions the waiting is usually 100's of ms, but with 100 it's multiple seconds (up to 8 or 9).

If instead of emitting 5000 tuples I emit 1 tuple with a list of 5000 objects, the response time for the entire call is 1-2s.

I've tried to change LMAX queue settings per http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/ but little changes. I've also run this topology in traditional storm (not trident) and the same problem happens.

Are my expectations about the rate of emits just way off? (i.e. 5k small strings taking multiple seconds) If so that's fine, just want to make sure I'm not missing something obvious here.

package io.keen.storm.drpc.basic;
import backtype.storm.LocalDRPC;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import io.keen.base.BaseDrpcStream;
import io.keen.util.Env;
import io.keen.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.TridentTopology;
import storm.trident.operation.*;
import storm.trident.tuple.TridentTuple;
import java.util.concurrent.atomic.AtomicInteger;
public class PerformanceTestDrpcStream extends BaseDrpcStream {
private static final Logger LOG = LoggerFactory.getLogger(PerformanceTestDrpcStream.class);
@Override
public void addToTopology(TridentTopology topology, LocalDRPC drpc, Env config) {
topology.newDRPCStream("performance", drpc)
.each(new Fields("args"), new SplitterFunction(), new Fields("index"))
.shuffle()
.each(new Fields("index"), new EachFunction(), new Fields("added"))
.parallelismHint(100)
.aggregate(new Fields("added"), new SimpleAggregator(), new Fields("result"));
}
private static class EachFunction extends BaseFunction {
@Override
public void execute(TridentTuple objects, TridentCollector collector) {
Timer timer = Timer.start("bolt");
for (int i = 0; i < 5000; i++) {
collector.emit(new Values(objects.getString(0)));
}
timer.endQuietly();
LOG.debug("EachFunctionExecute, t={}", new Object[] { timer.getFormattedDiff() });
}
}
private static class SplitterFunction extends BaseFunction {
@Override
public void execute(TridentTuple objects, TridentCollector collector) {
Timer timer = Timer.start("bolt");
for (int i = 0; i < 1000; i++) {
collector.emit(new Values(objects.getString(0)));
}
timer.endQuietly();
LOG.debug("IndexFunctionExecute, t={}", new Object[] { timer.getFormattedDiff() });
}
}
private static class SimpleAggregator extends BaseAggregator {
@Override
public Object init(Object o, TridentCollector collector) {
return new AtomicInteger();
}
@Override
public void aggregate(Object o, TridentTuple objects, TridentCollector collector) {
((AtomicInteger) o).getAndIncrement();
}
@Override
public void complete(Object o, TridentCollector collector) {
collector.emit(new Values(((AtomicInteger) o).get()));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment