Skip to content

Instantly share code, notes, and snippets.

@hugomarques
Last active April 15, 2025 14:53
Show Gist options
  • Save hugomarques/735762e9f8fb5ec08ebb407c58e31526 to your computer and use it in GitHub Desktop.
Save hugomarques/735762e9f8fb5ec08ebb407c58e31526 to your computer and use it in GitHub Desktop.
Benchmark de nestedParallelStreams
package dev.hugomarques.parallel;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
@Fork(1)
@Warmup(iterations = 2)
@Measurement(iterations = 3)
public class NestedParallelStreamBenchmarkV2 {
@Param({"5", "10"})
private int outerSize;
@Param({"50", "100"})
private int middleSize;
@Param({"100", "500"})
private int innerSize;
@Param({"1000"})
private int workSize;
private List<ComplexObject> outer;
private List<ComplexObject> middle;
private List<ComplexObject> inner;
// Track thread usage
private AtomicInteger maxThreadsUsed = new AtomicInteger(0);
private AtomicInteger currentThreadsActive = new AtomicInteger(0);
@Setup
public void setup() {
// Create lists with objects that require allocation
outer = IntStream.range(0, outerSize)
.mapToObj(i -> new ComplexObject("outer-" + i, i))
.collect(Collectors.toList());
middle = IntStream.range(0, middleSize)
.mapToObj(i -> new ComplexObject("middle-" + i, i))
.collect(Collectors.toList());
inner = IntStream.range(0, innerSize)
.mapToObj(i -> new ComplexObject("inner-" + i, i))
.collect(Collectors.toList());
}
@TearDown(Level.Iteration)
public void tearDown() {
System.out.println("Max threads used: " + maxThreadsUsed.get());
// Reset counter
maxThreadsUsed.set(0);
currentThreadsActive.set(0);
}
@Benchmark
public void nestedParallelStreams(Blackhole blackhole) {
List<Result> results = new ArrayList<>();
outer.parallelStream().forEach(o ->
middle.parallelStream().forEach(m ->
inner.parallelStream().forEach(i -> {
trackThread();
Result r = simulateWorkWithAllocation(o, m, i, workSize);
synchronized(results) {
results.add(r);
}
releaseThread();
})
)
);
blackhole.consume(results);
}
@Benchmark
public void flattenedParallelStream(Blackhole blackhole) {
List<Result> results = new ArrayList<>();
outer.parallelStream().forEach(o ->
middle.forEach(m ->
inner.forEach(i -> {
trackThread();
Result r = simulateWorkWithAllocation(o, m, i, workSize);
synchronized(results) {
results.add(r);
}
releaseThread();
})
)
);
blackhole.consume(results);
}
@Benchmark
public void singleParallelStream(Blackhole blackhole) {
List<Result> results = new ArrayList<>();
// Create a single flattened stream of tasks
List<Task> tasks = new ArrayList<>();
for (ComplexObject o : outer) {
for (ComplexObject m : middle) {
for (ComplexObject i : inner) {
tasks.add(new Task(o, m, i));
}
}
}
tasks.parallelStream().forEach(task -> {
trackThread();
Result r = simulateWorkWithAllocation(task.o(), task.m(), task.i(), workSize);
synchronized(results) {
results.add(r);
}
releaseThread();
});
blackhole.consume(results);
}
private void trackThread() {
int current = currentThreadsActive.incrementAndGet();
int max;
do {
max = maxThreadsUsed.get();
if (current <= max) break;
} while (!maxThreadsUsed.compareAndSet(max, current));
}
private void releaseThread() {
currentThreadsActive.decrementAndGet();
}
private Result simulateWorkWithAllocation(ComplexObject o, ComplexObject m, ComplexObject i, int iterations) {
double result = 0;
List<Double> intermediateResults = new ArrayList<>();
for (int j = 0; j < iterations; j++) {
// Create some memory pressure
String combined = o.name() + "-" + m.name() + "-" + i.name() + "-" + j;
result += Math.sqrt(o.value() + m.value() + i.value() + combined.hashCode() % 10);
// Add intermediate results to create some allocation pressure
if (j % 100 == 0) {
intermediateResults.add(result);
}
}
return new Result(o, m, i, result, intermediateResults);
}
// Convert to records
record ComplexObject(String name, int value, byte[] payload) {
ComplexObject(String name, int value) {
this(name, value, new byte[1024]); // Add some memory footprint to each object
}
}
record Result(ComplexObject o, ComplexObject m, ComplexObject i, double value, List<Double> intermediates) {}
record Task(ComplexObject o, ComplexObject m, ComplexObject i) {}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(NestedParallelStreamBenchmarkV2.class.getName())
.forks(1)
.warmupIterations(2)
.measurementIterations(3)
.threads(Runtime.getRuntime().availableProcessors())
.build();
new Runner(opt).run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment