Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Last active August 29, 2015 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akarnokd/af28c5a062b81331e235 to your computer and use it in GitHub Desktop.
Save akarnokd/af28c5a062b81331e235 to your computer and use it in GitHub Desktop.
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx;
import java.util.EnumMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import rx.Serializer.Strategy;
import rx.functions.Action1;
import static rx.Serializer.Strategy.*;
public class SerializerBenchmark {
static final class Consumer implements Action1<Integer> {
int[] value;
int count;
public Consumer(int capacity) {
this.value = new int[capacity];
}
@Override
public void call(Integer t1) {
if (t1 == null) {
new NullPointerException().printStackTrace();
return;
}
value[t1]++;
count++;
}
public void verify(int itemTimes) {
if (count != value.length * itemTimes) {
System.err.println("Called only " + count + " times instead of " + (value.length * itemTimes) + " times");
}
for (int j = 0; j < value.length; j++) {
int i = value[j];
if (i != itemTimes) {
System.err.println("Found a slot " + j + " with execution count " + i);
return;
}
}
}
}
public static void main(String[] args) throws Exception {
final int n = 1_000_000;
final int n2 = 1_000_000;
final int ncpu = Runtime.getRuntime().availableProcessors();
EnumMap<Strategy, double[]> throughputs = new EnumMap<>(Strategy.class);
// reduce overhead of autoboxing
final Integer[] intCache = new Integer[Math.max(n, n2)];
for (int i = 0; i < intCache.length; i++) {
intCache[i] = i;
}
Strategy[] singleThreadStrategies =
Strategy.values();
// new Strategy[] {
// };
Strategy[] multiThreadStrategies =
Strategy.values();
// new Strategy[] {
// };
for (Strategy str : singleThreadStrategies) {
Thread.sleep(1000);
System.out.println(str);
double r = Benchmark.measure(15, n, () -> {
Consumer c = new Consumer(n);
Serializer<Integer> s = new Serializer<>(str, c);
for (int i = 0; i < n; i++) {
s.call(intCache[i]);
}
c.verify(1);
});
throughputs.computeIfAbsent(str, a -> new double[ncpu])[0] = r;
}
for (Strategy str5 : multiThreadStrategies) {
System.out.println();
if (str5 == DIRECT) {
continue;
}
for (int i = 2; i <= ncpu; i++) {
Thread.sleep(1000);
System.out.println(str5 + " concurrenlty with " + i + " threads");
int k = i;
ExecutorService exec = Executors.newFixedThreadPool(k);
double r = Benchmark.measure(15, n2 * k, () -> {
Consumer c = new Consumer(n2);
Serializer<Integer> s = new Serializer<>(str5, c);
CountDownLatch cdl = new CountDownLatch(k);
for (int j = 0; j < k; j++) {
exec.submit(() -> {
for (int m = 0; m < n2; m++) {
s.call(intCache[m]);
}
cdl.countDown();
});
}
try {
cdl.await();
} catch (InterruptedException e) {
}
c.verify(k);
});
throughputs.computeIfAbsent(str5, a -> new double[ncpu])[i - 1] = r;
exec.shutdown();
try {
exec.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
}
}
}
throughputs.forEach((k, v) -> {
System.out.print(k);
System.out.print("\t");
for (double d : v) {
System.out.printf("%.3f ops/s\t", d);
}
System.out.println();
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment