-
-
Save rmannibucau/fd98fb6a10f9557613fb145c8e7e2de1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.rmannibucau.beamvscompletionstage; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.CompletionStage; | |
import java.util.function.Consumer; | |
import java.util.function.Function; | |
import org.apache.beam.sdk.options.PipelineOptions; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.state.Timer; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory; | |
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; | |
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; | |
import org.apache.beam.sdk.transforms.windowing.BoundedWindow; | |
import org.apache.beam.sdk.transforms.windowing.GlobalWindow; | |
import org.apache.beam.sdk.transforms.windowing.PaneInfo; | |
import org.apache.beam.sdk.values.PCollectionView; | |
import org.apache.beam.sdk.values.TupleTag; | |
import org.joda.time.Instant; | |
import org.openjdk.jmh.annotations.Benchmark; | |
import org.openjdk.jmh.annotations.Mode; | |
import org.openjdk.jmh.annotations.Scope; | |
import org.openjdk.jmh.annotations.Setup; | |
import org.openjdk.jmh.annotations.State; | |
import org.openjdk.jmh.annotations.TearDown; | |
import org.openjdk.jmh.infra.Blackhole; | |
import org.openjdk.jmh.runner.Runner; | |
import org.openjdk.jmh.runner.RunnerException; | |
import org.openjdk.jmh.runner.options.OptionsBuilder; | |
// | |
// use case is to have a string and compute its length | |
// this is done in a synchronous mode to be able to compare both beam and completionstage | |
// | |
@State(Scope.Thread) | |
public class Comparison { | |
private String element; | |
private LengthFn directFn; | |
private DoFnInvoker<String, Integer> directRunner; | |
private DoFnInvoker.ArgumentProvider<String, Integer> directArgs; | |
private CompletionStageLengthFn completionStageFn; | |
private DoFnInvoker<CompletionStage<String>, Integer> completionStageRunner; | |
private DoFnInvoker.ArgumentProvider<CompletionStage<String>, Integer> completionStageArgs; | |
private FastCompletionStageLengthFn fastCompletionStageFn; | |
private DoFnInvoker<CompletionStage<String>, Integer> fastCompletionStageRunner; | |
private DoFnInvoker.ArgumentProvider<CompletionStage<String>, Integer> fastCompletionStageArgs; | |
@Setup | |
public void init() { | |
element = "testrecord"; | |
directFn = new LengthFn(); | |
directRunner = ByteBuddyDoFnInvokerFactory.only().invokerFor(directFn); | |
directArgs = directFn.newArgProvider(element); | |
completionStageFn = new CompletionStageLengthFn(); | |
completionStageRunner = ByteBuddyDoFnInvokerFactory.only().invokerFor(completionStageFn); | |
completionStageArgs = completionStageFn.newArgProvider(element); | |
fastCompletionStageFn = new FastCompletionStageLengthFn(); | |
fastCompletionStageRunner = ByteBuddyDoFnInvokerFactory.only().invokerFor(fastCompletionStageFn); | |
fastCompletionStageArgs = fastCompletionStageFn.newArgProvider(element); | |
directRunner.invokeSetup(); | |
completionStageRunner.invokeSetup(); | |
// we skip the bundle since it is not that needed and requires an impl of contexts | |
} | |
@Benchmark | |
public void passthrough(final Blackhole blackhole) { | |
blackhole.consume(element); | |
} | |
@Benchmark | |
public void defaultCompletionStage(final Blackhole blackhole) { | |
blackhole.consume(CompletableFuture.completedFuture(element).thenApply(String::length)); | |
} | |
@Benchmark | |
public void fastCompletionStage(final Blackhole blackhole) { | |
blackhole.consume(new FastCompletionFuture<>(element).thenApply(String::length)); | |
} | |
@Benchmark | |
public void beam(final Blackhole blackhole) { | |
blackhole.consume(directRunner.invokeProcessElement(directArgs)); | |
} | |
@Benchmark | |
public void beamCompletionStage(final Blackhole blackhole) { | |
blackhole.consume(completionStageRunner.invokeProcessElement(completionStageArgs)); | |
} | |
@Benchmark | |
public void beamFastCompletionStage(final Blackhole blackhole) { | |
blackhole.consume(fastCompletionStageRunner.invokeProcessElement(fastCompletionStageArgs)); | |
} | |
@TearDown | |
public void release() { | |
// symmetrically we skip finish bundle | |
completionStageRunner.invokeTeardown(); | |
directRunner.invokeTeardown(); | |
} | |
public static class LengthFn extends ProcessContextFactoryDoFn<String, Integer> { | |
@ProcessElement | |
public void onElement(final ProcessContext context) { | |
context.output(context.element().length()); | |
} | |
@Override | |
protected String contextElement(final String element) { | |
return element; | |
} | |
} | |
public static class CompletionStageLengthFn extends ProcessContextFactoryDoFn<CompletionStage<String>, Integer> { | |
@ProcessElement | |
public void onElement(final ProcessContext context) { | |
context.element().thenAccept(e -> context.output(e.length())); | |
} | |
@Override | |
protected CompletionStage<String> contextElement(final String element) { | |
return CompletableFuture.completedFuture(element); | |
} | |
} | |
public static class FastCompletionStageLengthFn extends ProcessContextFactoryDoFn<CompletionStage<String>, Integer> { | |
@ProcessElement | |
public void onElement(final ProcessContext context) { | |
context.element().thenAccept(e -> context.output(e.length())); | |
} | |
@Override | |
protected CompletionStage<String> contextElement(final String element) { | |
return new FastCompletionFuture<>(element); | |
} | |
} | |
// intended for already resolved cases which don't need all that logic | |
public static class FastCompletionFuture<T> | |
extends CompletableFuture<T> /* lazy to impl everything, normally impl CompletionStage */ { | |
private final T value; | |
public FastCompletionFuture(final T value) { | |
this.value = value; | |
} | |
@Override | |
public CompletableFuture<Void> thenAccept(final Consumer<? super T> action) { | |
action.accept(value); | |
// we can cast since the param will be typed and not usable anyway | |
return (CompletableFuture<Void>) this; | |
} | |
@Override | |
public <U> CompletableFuture<U> thenApply(final Function<? super T, ? extends U> fn) { | |
return new FastCompletionFuture<>(fn.apply(value)); | |
} | |
} | |
public static abstract class ProcessContextFactoryDoFn<A, B> extends DoFn<A, B> { | |
protected abstract A contextElement(String element); | |
public DoFn<A, B>.ProcessContext newProcessContext(final A element, final Instant now, final PipelineOptions options) { | |
return new ProcessContextImpl(element, now, options); | |
} | |
public DoFnInvoker.ArgumentProvider<A, B> newArgProvider(final String element) { | |
final A convertedElement = contextElement(element); | |
return new DoFnInvoker.ArgumentProvider<A, B>() { | |
private final PipelineOptions options = PipelineOptionsFactory.create(); | |
private final Instant now = Instant.now(); | |
@Override | |
public BoundedWindow window() { | |
return GlobalWindow.INSTANCE; | |
} | |
@Override | |
public PipelineOptions pipelineOptions() { | |
return options; | |
} | |
@Override | |
public DoFn<A, B>.ProcessContext processContext(final DoFn<A, B> doFn) { | |
return newProcessContext(convertedElement, now, options); | |
} | |
@Override | |
public DoFn<A, B>.StartBundleContext startBundleContext(final DoFn<A, B> doFn) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public DoFn<A, B>.FinishBundleContext finishBundleContext(final DoFn<A, B> doFn) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public DoFn<A, B>.OnTimerContext onTimerContext(final DoFn<A, B> doFn) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public RestrictionTracker<?, ?> restrictionTracker() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public org.apache.beam.sdk.state.State state(final String stateId) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public Timer timer(final String timerId) { | |
throw new UnsupportedOperationException(); | |
} | |
}; | |
} | |
private class ProcessContextImpl extends ProcessContext { | |
private final A element; | |
private final Instant now; | |
private final PipelineOptions options; | |
private ProcessContextImpl(final A element, final Instant now, final PipelineOptions options) { | |
this.element = element; | |
this.now = now; | |
this.options = options; | |
} | |
@Override | |
public PipelineOptions getPipelineOptions() { | |
return options; | |
} | |
@Override | |
public void output(final B output) { | |
// no-op: this is NOT the beam impl so it makes run faster than expected | |
} | |
@Override | |
public void outputWithTimestamp(final B output, final org.joda.time.Instant timestamp) { | |
throw new UnsupportedOperationException( | |
"in real this will create a wrapper exactly like a CompletionFuture.completed"); | |
} | |
@Override | |
public <T> void output(final TupleTag<T> tag, T output) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public <T> void outputWithTimestamp(final TupleTag<T> tag, final T output, final org.joda.time.Instant timestamp) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public A element() { | |
return element; | |
} | |
@Override | |
public <T> T sideInput(final PCollectionView<T> view) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public org.joda.time.Instant timestamp() { | |
return now; | |
} | |
@Override | |
public PaneInfo pane() { | |
return null; | |
} | |
@Override | |
public void updateWatermark(final org.joda.time.Instant watermark) { | |
// no-op | |
} | |
} | |
} | |
public static class BaseArgProvider<T> implements DoFnInvoker.ArgumentProvider<T, Integer> { | |
private final PipelineOptions options = PipelineOptionsFactory.create(); | |
private final Instant now = Instant.now(); | |
private final T element; | |
private final ProcessContextFactoryDoFn<T, Integer> fn; | |
public BaseArgProvider(final T element, final ProcessContextFactoryDoFn<T, Integer> fn) { | |
this.element = element; | |
this.fn = fn; | |
} | |
@Override | |
public BoundedWindow window() { | |
return GlobalWindow.INSTANCE; | |
} | |
@Override | |
public PipelineOptions pipelineOptions() { | |
return options; | |
} | |
@Override | |
public DoFn<T, Integer>.ProcessContext processContext(final DoFn<T, Integer> doFn) { | |
return fn.newProcessContext(element, now, options); | |
} | |
@Override | |
public DoFn<T, Integer>.StartBundleContext startBundleContext(final DoFn<T, Integer> doFn) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public DoFn<T, Integer>.FinishBundleContext finishBundleContext(final DoFn<T, Integer> doFn) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public DoFn<T, Integer>.OnTimerContext onTimerContext(final DoFn<T, Integer> doFn) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public RestrictionTracker<?, ?> restrictionTracker() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public org.apache.beam.sdk.state.State state(final String stateId) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public Timer timer(final String timerId) { | |
throw new UnsupportedOperationException(); | |
} | |
} | |
public static void main(final String[] args) throws RunnerException { | |
// empirically (on my computer) figures are very stable after 3 iterations | |
new Runner(new OptionsBuilder().warmupIterations(6).measurementIterations(5).mode(Mode.Throughput).forks(1) | |
.include(Comparison.class.getSimpleName()).build()).run(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment