Last active
April 15, 2025 14:53
-
-
Save hugomarques/735762e9f8fb5ec08ebb407c58e31526 to your computer and use it in GitHub Desktop.
Benchmark de nestedParallelStreams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.hugomarques.parallel; | |
import org.openjdk.jmh.annotations.*; | |
import org.openjdk.jmh.infra.Blackhole; | |
import org.openjdk.jmh.runner.Runner; | |
import org.openjdk.jmh.runner.RunnerException; | |
import org.openjdk.jmh.runner.options.Options; | |
import org.openjdk.jmh.runner.options.OptionsBuilder; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.stream.Collectors; | |
import java.util.stream.IntStream; | |
@BenchmarkMode(Mode.AverageTime) | |
@OutputTimeUnit(TimeUnit.MILLISECONDS) | |
@State(Scope.Thread) | |
@Fork(1) | |
@Warmup(iterations = 2) | |
@Measurement(iterations = 3) | |
public class NestedParallelStreamBenchmarkV2 { | |
@Param({"5", "10"}) | |
private int outerSize; | |
@Param({"50", "100"}) | |
private int middleSize; | |
@Param({"100", "500"}) | |
private int innerSize; | |
@Param({"1000"}) | |
private int workSize; | |
private List<ComplexObject> outer; | |
private List<ComplexObject> middle; | |
private List<ComplexObject> inner; | |
// Track thread usage | |
private AtomicInteger maxThreadsUsed = new AtomicInteger(0); | |
private AtomicInteger currentThreadsActive = new AtomicInteger(0); | |
@Setup | |
public void setup() { | |
// Create lists with objects that require allocation | |
outer = IntStream.range(0, outerSize) | |
.mapToObj(i -> new ComplexObject("outer-" + i, i)) | |
.collect(Collectors.toList()); | |
middle = IntStream.range(0, middleSize) | |
.mapToObj(i -> new ComplexObject("middle-" + i, i)) | |
.collect(Collectors.toList()); | |
inner = IntStream.range(0, innerSize) | |
.mapToObj(i -> new ComplexObject("inner-" + i, i)) | |
.collect(Collectors.toList()); | |
} | |
@TearDown(Level.Iteration) | |
public void tearDown() { | |
System.out.println("Max threads used: " + maxThreadsUsed.get()); | |
// Reset counter | |
maxThreadsUsed.set(0); | |
currentThreadsActive.set(0); | |
} | |
@Benchmark | |
public void nestedParallelStreams(Blackhole blackhole) { | |
List<Result> results = new ArrayList<>(); | |
outer.parallelStream().forEach(o -> | |
middle.parallelStream().forEach(m -> | |
inner.parallelStream().forEach(i -> { | |
trackThread(); | |
Result r = simulateWorkWithAllocation(o, m, i, workSize); | |
synchronized(results) { | |
results.add(r); | |
} | |
releaseThread(); | |
}) | |
) | |
); | |
blackhole.consume(results); | |
} | |
@Benchmark | |
public void flattenedParallelStream(Blackhole blackhole) { | |
List<Result> results = new ArrayList<>(); | |
outer.parallelStream().forEach(o -> | |
middle.forEach(m -> | |
inner.forEach(i -> { | |
trackThread(); | |
Result r = simulateWorkWithAllocation(o, m, i, workSize); | |
synchronized(results) { | |
results.add(r); | |
} | |
releaseThread(); | |
}) | |
) | |
); | |
blackhole.consume(results); | |
} | |
@Benchmark | |
public void singleParallelStream(Blackhole blackhole) { | |
List<Result> results = new ArrayList<>(); | |
// Create a single flattened stream of tasks | |
List<Task> tasks = new ArrayList<>(); | |
for (ComplexObject o : outer) { | |
for (ComplexObject m : middle) { | |
for (ComplexObject i : inner) { | |
tasks.add(new Task(o, m, i)); | |
} | |
} | |
} | |
tasks.parallelStream().forEach(task -> { | |
trackThread(); | |
Result r = simulateWorkWithAllocation(task.o(), task.m(), task.i(), workSize); | |
synchronized(results) { | |
results.add(r); | |
} | |
releaseThread(); | |
}); | |
blackhole.consume(results); | |
} | |
private void trackThread() { | |
int current = currentThreadsActive.incrementAndGet(); | |
int max; | |
do { | |
max = maxThreadsUsed.get(); | |
if (current <= max) break; | |
} while (!maxThreadsUsed.compareAndSet(max, current)); | |
} | |
private void releaseThread() { | |
currentThreadsActive.decrementAndGet(); | |
} | |
private Result simulateWorkWithAllocation(ComplexObject o, ComplexObject m, ComplexObject i, int iterations) { | |
double result = 0; | |
List<Double> intermediateResults = new ArrayList<>(); | |
for (int j = 0; j < iterations; j++) { | |
// Create some memory pressure | |
String combined = o.name() + "-" + m.name() + "-" + i.name() + "-" + j; | |
result += Math.sqrt(o.value() + m.value() + i.value() + combined.hashCode() % 10); | |
// Add intermediate results to create some allocation pressure | |
if (j % 100 == 0) { | |
intermediateResults.add(result); | |
} | |
} | |
return new Result(o, m, i, result, intermediateResults); | |
} | |
// Convert to records | |
record ComplexObject(String name, int value, byte[] payload) { | |
ComplexObject(String name, int value) { | |
this(name, value, new byte[1024]); // Add some memory footprint to each object | |
} | |
} | |
record Result(ComplexObject o, ComplexObject m, ComplexObject i, double value, List<Double> intermediates) {} | |
record Task(ComplexObject o, ComplexObject m, ComplexObject i) {} | |
public static void main(String[] args) throws RunnerException { | |
Options opt = new OptionsBuilder() | |
.include(NestedParallelStreamBenchmarkV2.class.getName()) | |
.forks(1) | |
.warmupIterations(2) | |
.measurementIterations(3) | |
.threads(Runtime.getRuntime().availableProcessors()) | |
.build(); | |
new Runner(opt).run(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment