Skip to content

Instantly share code, notes, and snippets.

@rmannibucau
Created March 13, 2018 09:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rmannibucau/fd98fb6a10f9557613fb145c8e7e2de1 to your computer and use it in GitHub Desktop.
Save rmannibucau/fd98fb6a10f9557613fb145c8e7e2de1 to your computer and use it in GitHub Desktop.
package com.github.rmannibucau.beamvscompletionstage;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.OptionsBuilder;
//
// use case is to have a string and compute its length
// this is done in a synchronous mode to be able to compare both beam and completionstage
//
@State(Scope.Thread)
public class Comparison {
private String element;
private LengthFn directFn;
private DoFnInvoker<String, Integer> directRunner;
private DoFnInvoker.ArgumentProvider<String, Integer> directArgs;
private CompletionStageLengthFn completionStageFn;
private DoFnInvoker<CompletionStage<String>, Integer> completionStageRunner;
private DoFnInvoker.ArgumentProvider<CompletionStage<String>, Integer> completionStageArgs;
private FastCompletionStageLengthFn fastCompletionStageFn;
private DoFnInvoker<CompletionStage<String>, Integer> fastCompletionStageRunner;
private DoFnInvoker.ArgumentProvider<CompletionStage<String>, Integer> fastCompletionStageArgs;
@Setup
public void init() {
element = "testrecord";
directFn = new LengthFn();
directRunner = ByteBuddyDoFnInvokerFactory.only().invokerFor(directFn);
directArgs = directFn.newArgProvider(element);
completionStageFn = new CompletionStageLengthFn();
completionStageRunner = ByteBuddyDoFnInvokerFactory.only().invokerFor(completionStageFn);
completionStageArgs = completionStageFn.newArgProvider(element);
fastCompletionStageFn = new FastCompletionStageLengthFn();
fastCompletionStageRunner = ByteBuddyDoFnInvokerFactory.only().invokerFor(fastCompletionStageFn);
fastCompletionStageArgs = fastCompletionStageFn.newArgProvider(element);
directRunner.invokeSetup();
completionStageRunner.invokeSetup();
// we skip the bundle since it is not that needed and requires an impl of contexts
}
@Benchmark
public void passthrough(final Blackhole blackhole) {
blackhole.consume(element);
}
@Benchmark
public void defaultCompletionStage(final Blackhole blackhole) {
blackhole.consume(CompletableFuture.completedFuture(element).thenApply(String::length));
}
@Benchmark
public void fastCompletionStage(final Blackhole blackhole) {
blackhole.consume(new FastCompletionFuture<>(element).thenApply(String::length));
}
@Benchmark
public void beam(final Blackhole blackhole) {
blackhole.consume(directRunner.invokeProcessElement(directArgs));
}
@Benchmark
public void beamCompletionStage(final Blackhole blackhole) {
blackhole.consume(completionStageRunner.invokeProcessElement(completionStageArgs));
}
@Benchmark
public void beamFastCompletionStage(final Blackhole blackhole) {
blackhole.consume(fastCompletionStageRunner.invokeProcessElement(fastCompletionStageArgs));
}
@TearDown
public void release() {
// symmetrically we skip finish bundle
completionStageRunner.invokeTeardown();
directRunner.invokeTeardown();
}
public static class LengthFn extends ProcessContextFactoryDoFn<String, Integer> {
@ProcessElement
public void onElement(final ProcessContext context) {
context.output(context.element().length());
}
@Override
protected String contextElement(final String element) {
return element;
}
}
public static class CompletionStageLengthFn extends ProcessContextFactoryDoFn<CompletionStage<String>, Integer> {
@ProcessElement
public void onElement(final ProcessContext context) {
context.element().thenAccept(e -> context.output(e.length()));
}
@Override
protected CompletionStage<String> contextElement(final String element) {
return CompletableFuture.completedFuture(element);
}
}
public static class FastCompletionStageLengthFn extends ProcessContextFactoryDoFn<CompletionStage<String>, Integer> {
@ProcessElement
public void onElement(final ProcessContext context) {
context.element().thenAccept(e -> context.output(e.length()));
}
@Override
protected CompletionStage<String> contextElement(final String element) {
return new FastCompletionFuture<>(element);
}
}
// intended for already resolved cases which don't need all that logic
public static class FastCompletionFuture<T>
extends CompletableFuture<T> /* lazy to impl everything, normally impl CompletionStage */ {
private final T value;
public FastCompletionFuture(final T value) {
this.value = value;
}
@Override
public CompletableFuture<Void> thenAccept(final Consumer<? super T> action) {
action.accept(value);
// we can cast since the param will be typed and not usable anyway
return (CompletableFuture<Void>) this;
}
@Override
public <U> CompletableFuture<U> thenApply(final Function<? super T, ? extends U> fn) {
return new FastCompletionFuture<>(fn.apply(value));
}
}
public static abstract class ProcessContextFactoryDoFn<A, B> extends DoFn<A, B> {
protected abstract A contextElement(String element);
public DoFn<A, B>.ProcessContext newProcessContext(final A element, final Instant now, final PipelineOptions options) {
return new ProcessContextImpl(element, now, options);
}
public DoFnInvoker.ArgumentProvider<A, B> newArgProvider(final String element) {
final A convertedElement = contextElement(element);
return new DoFnInvoker.ArgumentProvider<A, B>() {
private final PipelineOptions options = PipelineOptionsFactory.create();
private final Instant now = Instant.now();
@Override
public BoundedWindow window() {
return GlobalWindow.INSTANCE;
}
@Override
public PipelineOptions pipelineOptions() {
return options;
}
@Override
public DoFn<A, B>.ProcessContext processContext(final DoFn<A, B> doFn) {
return newProcessContext(convertedElement, now, options);
}
@Override
public DoFn<A, B>.StartBundleContext startBundleContext(final DoFn<A, B> doFn) {
throw new UnsupportedOperationException();
}
@Override
public DoFn<A, B>.FinishBundleContext finishBundleContext(final DoFn<A, B> doFn) {
throw new UnsupportedOperationException();
}
@Override
public DoFn<A, B>.OnTimerContext onTimerContext(final DoFn<A, B> doFn) {
throw new UnsupportedOperationException();
}
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
throw new UnsupportedOperationException();
}
@Override
public org.apache.beam.sdk.state.State state(final String stateId) {
throw new UnsupportedOperationException();
}
@Override
public Timer timer(final String timerId) {
throw new UnsupportedOperationException();
}
};
}
private class ProcessContextImpl extends ProcessContext {
private final A element;
private final Instant now;
private final PipelineOptions options;
private ProcessContextImpl(final A element, final Instant now, final PipelineOptions options) {
this.element = element;
this.now = now;
this.options = options;
}
@Override
public PipelineOptions getPipelineOptions() {
return options;
}
@Override
public void output(final B output) {
// no-op: this is NOT the beam impl so it makes run faster than expected
}
@Override
public void outputWithTimestamp(final B output, final org.joda.time.Instant timestamp) {
throw new UnsupportedOperationException(
"in real this will create a wrapper exactly like a CompletionFuture.completed");
}
@Override
public <T> void output(final TupleTag<T> tag, T output) {
throw new UnsupportedOperationException();
}
@Override
public <T> void outputWithTimestamp(final TupleTag<T> tag, final T output, final org.joda.time.Instant timestamp) {
throw new UnsupportedOperationException();
}
@Override
public A element() {
return element;
}
@Override
public <T> T sideInput(final PCollectionView<T> view) {
throw new UnsupportedOperationException();
}
@Override
public org.joda.time.Instant timestamp() {
return now;
}
@Override
public PaneInfo pane() {
return null;
}
@Override
public void updateWatermark(final org.joda.time.Instant watermark) {
// no-op
}
}
}
public static class BaseArgProvider<T> implements DoFnInvoker.ArgumentProvider<T, Integer> {
private final PipelineOptions options = PipelineOptionsFactory.create();
private final Instant now = Instant.now();
private final T element;
private final ProcessContextFactoryDoFn<T, Integer> fn;
public BaseArgProvider(final T element, final ProcessContextFactoryDoFn<T, Integer> fn) {
this.element = element;
this.fn = fn;
}
@Override
public BoundedWindow window() {
return GlobalWindow.INSTANCE;
}
@Override
public PipelineOptions pipelineOptions() {
return options;
}
@Override
public DoFn<T, Integer>.ProcessContext processContext(final DoFn<T, Integer> doFn) {
return fn.newProcessContext(element, now, options);
}
@Override
public DoFn<T, Integer>.StartBundleContext startBundleContext(final DoFn<T, Integer> doFn) {
throw new UnsupportedOperationException();
}
@Override
public DoFn<T, Integer>.FinishBundleContext finishBundleContext(final DoFn<T, Integer> doFn) {
throw new UnsupportedOperationException();
}
@Override
public DoFn<T, Integer>.OnTimerContext onTimerContext(final DoFn<T, Integer> doFn) {
throw new UnsupportedOperationException();
}
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
throw new UnsupportedOperationException();
}
@Override
public org.apache.beam.sdk.state.State state(final String stateId) {
throw new UnsupportedOperationException();
}
@Override
public Timer timer(final String timerId) {
throw new UnsupportedOperationException();
}
}
public static void main(final String[] args) throws RunnerException {
// empirically (on my computer) figures are very stable after 3 iterations
new Runner(new OptionsBuilder().warmupIterations(6).measurementIterations(5).mode(Mode.Throughput).forks(1)
.include(Comparison.class.getSimpleName()).build()).run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment