Skip to content

Instantly share code, notes, and snippets.

@robertwb
Last active April 22, 2017 01:12
Show Gist options
  • Save robertwb/5b64829fa91d9a61f55886e5ff6b1f8c to your computer and use it in GitHub Desktop.
Save robertwb/5b64829fa91d9a61f55886e5ff6b1f8c to your computer and use it in GitHub Desktop.
# pip install cython
# python setup.py build_ext --inplace; python bench-runner.py
import time
import apache_beam as beam
from apache_beam.runners import common
from apache_beam.utils.windowed_value import WindowedValue
class MyDoFn(beam.DoFn):
@staticmethod
def process(element):
return (None,)
class WindowingStub(object):
windowfn = None
class NullReceiver(common.Receiver):
def receive(self, unused):
pass
def create_runner(receiver=NullReceiver()):
return common.DoFnRunner(
MyDoFn(),
args=(),
kwargs={},
side_inputs={},
windowing=WindowingStub(),
context=None,
tagged_receivers={None: receiver},
logger=None,
step_name='step',
logging_context=common.LoggingContext(),
state=common.DoFnState(None),
scoped_metrics_container=None)
NUM_STEPS = 100
NUM_TIMINGS = 10
NUM_ELEMENTS = 10000
# Fused graph.
runner = create_runner()
for _ in range(NUM_STEPS):
runner = create_runner(runner)
w = WindowedValue(None, 100, (common.GlobalWindow(),))
all_elapsed = []
for _ in range(NUM_TIMINGS):
start = time.time()
runner.start()
for _ in range(NUM_ELEMENTS):
runner.process(w)
elapsed = time.time() - start
runner.finish()
print elapsed
all_elapsed.append(elapsed)
print sorted(all_elapsed)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment