Skip to content

Instantly share code, notes, and snippets.

@geoff-m
Created December 21, 2020 00:36
Show Gist options
  • Save geoff-m/b0977f9e4d1d9edba4bfa58678c1a0ff to your computer and use it in GitHub Desktop.
Save geoff-m/b0977f9e4d1d9edba4bfa58678c1a0ff to your computer and use it in GitHub Desktop.
Demo application using ForkJoinPool
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
double sequentialTime = 1;
for (int threads = 1; threads <= 12; ++threads) {
var input = makeTest(500_000_000);
var start = System.nanoTime();
var answer = Solve.getSumOfReciprocals(input, threads);
var stop = System.nanoTime();
var timeMS = (stop - start) / 1000000d;
if (threads == 1)
sequentialTime = timeMS;
// Stopped printing answer since it doesn't matter.
//System.out.format("Threads %d:\tTime (ms): %f\tSpeedup: %.2fx\tSum: %f\n", threads, timeMS, sequentialTime / timeMS, answer);
System.out.format("Threads %d:\tTime (ms): %f\tSpeedup: %.2fx\t\n", threads, timeMS, sequentialTime / timeMS);
}
}
static float[] makeTest(int size) {
final int THREADS = 10;
float[] ret = new float[size];
var pool = ForkJoinPool.commonPool();
var chunkSize = size / THREADS;
var extra = size - (THREADS * chunkSize);
var futures = new ForkJoinTask[THREADS];
for (int i = 0; i < THREADS; ++i) {
var worker = new TestMaker(
i * chunkSize,
i * chunkSize + chunkSize + (i == THREADS - 1 ? extra : 0),
ret);
futures[i] = pool.submit(worker);
}
for (var f : futures) {
try {
// Void, just wait for completion.
f.get();
} catch (InterruptedException | ExecutionException e) {
System.err.println("Error creating test data: " + e);
}
}
return ret;
}
static class TestMaker implements Runnable {
private final float[] res;
private final int start, stop;
public TestMaker(int start, int stop, float[] result) {
this.start = start;
this.stop = stop;
this.res = result;
}
@Override
public void run() {
Random r = new Random();
for (int i = start; i < stop; ++i)
res[i] = r.nextFloat();
}
}
}
Threads 1: Time (ms): 440.480400 Speedup: 1.00x
Threads 2: Time (ms): 222.024300 Speedup: 1.98x
Threads 3: Time (ms): 148.120800 Speedup: 2.97x
Threads 4: Time (ms): 107.478600 Speedup: 4.10x
Threads 5: Time (ms): 86.099900 Speedup: 5.12x
Threads 6: Time (ms): 75.250900 Speedup: 5.85x
Threads 7: Time (ms): 65.285000 Speedup: 6.75x
Threads 8: Time (ms): 55.899600 Speedup: 7.88x
Threads 9: Time (ms): 53.841500 Speedup: 8.18x
Threads 10: Time (ms): 47.180800 Speedup: 9.34x
Threads 11: Time (ms): 45.975800 Speedup: 9.58x
Threads 12: Time (ms): 47.435800 Speedup: 9.29x
import java.util.concurrent.*;
public class Solve {
public static double getSumOfReciprocals(float[] data, int threads) {
//var pool = new ForkJoinPool(threads);
var pool = ForkJoinPool.commonPool();
var chunkSize = data.length / threads;
var extra = data.length - chunkSize * threads;
var futures = new ForkJoinTask[threads];
for (int i=0; i < threads; ++i) {
futures[i] = pool.submit(new SumOfReciprocalsWorker(
i * chunkSize,
i * chunkSize + chunkSize + (i == threads - 1 ? extra : 0),
data));
}
pool.shutdown();
double ret = 0;
for (var f : futures) {
try {
ret += (double)f.get();
} catch (InterruptedException | ExecutionException e) {
System.err.println("Error during adding: " + e);
}
}
return ret;
}
private static class SumOfReciprocalsWorker implements Callable<Double> {
private final float[] input;
private final int start, stop;
public SumOfReciprocalsWorker(int start, int stop, float[] input) {
this.start = start;
this.stop = stop;
this.input = input;
}
@Override
public Double call() {
double ret = 0;
for (int i = start; i < stop; ++i) {
ret += 1 / input[i];
}
return ret;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment