Skip to content

Instantly share code, notes, and snippets.

@jcruelty
Created March 28, 2019 21:27
Show Gist options
  • Save jcruelty/3bf5ce5865110372a2d1650b1421cde1 to your computer and use it in GitHub Desktop.
Save jcruelty/3bf5ce5865110372a2d1650b1421cde1 to your computer and use it in GitHub Desktop.
Demo of unexpected behavior of watermark timers with TestStream
"""
This demonstrates the behavior of TestStream with stateful processing timer
Expected behavior:
When using TestStream, if a stateful dofn adds value 'foo' to BagState then sets two watermark timers t1 and t2, where
t1 clears the bag state and t2 reads from bag state and emits the contents, if t1 < t2 then nothing should get emitted
when the TestPipeline is run. (bag state should be cleared by timer at t1 before it is read from by timer at t2)
Actual behavior:
For the scenario described above, results get emitted despite t1 being less than t2.
This seems like a bug... either that or we are not understanding this API. The goal is to test that watermark timers
work as expected.
"""
from __future__ import absolute_import
import unittest
import apache_beam as beam
from apache_beam.coders import BytesCoder
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.userstate import BagStateSpec
from apache_beam.transforms.userstate import on_timer
from apache_beam.transforms.userstate import TimeDomain
from apache_beam.transforms.userstate import TimerSpec
class StatefulDoFn(beam.DoFn):
BAG_STATE = BagStateSpec('bag_state', BytesCoder())
EMIT_TIMER = TimerSpec('emit_timer', TimeDomain.WATERMARK)
CLEAR_TIMER = TimerSpec('clear_timer', TimeDomain.WATERMARK)
def process(self,
element,
bag_state=beam.DoFn.StateParam(BAG_STATE), # noqa: B008
emit_timer=beam.DoFn.TimerParam(EMIT_TIMER), # noqa: B008
clear_timer=beam.DoFn.TimerParam(CLEAR_TIMER)): # noqa: B008
key, value = element
bag_state.add(value)
clear_timer.set(100)
emit_timer.set(1000)
@on_timer(EMIT_TIMER)
def emit_values(self, bag_state=beam.DoFn.StateParam(BAG_STATE)): # noqa: B008
for value in bag_state.read():
yield value
@on_timer(CLEAR_TIMER)
def clear_values(self, bag_state=beam.DoFn.StateParam(BAG_STATE)): # noqa: B008
bag_state.clear()
class StatefulDoFnTest(unittest.TestCase):
# pylint: disable=expression-not-assigned
all_records = None
def setUp(self):
StatefulDoFnTest.all_records = []
def record_dofn(self):
class RecordDoFn(beam.DoFn):
def process(self, element):
StatefulDoFnTest.all_records.append(element)
return RecordDoFn()
def test_watermark_timer_should_not_fire_until_time_reached(self):
with TestPipeline() as p:
test_stream = (TestStream()
.advance_watermark_to(0)
.add_elements([('key', 'value')]) # adds value to bagstate, clear_timer=100, emit_timer=1000
.advance_watermark_to(100) # clear_timer should fire, clearing state
)
(p
| test_stream
| beam.ParDo(StatefulDoFn())
| beam.ParDo(self.record_dofn())
)
# bag state should have been cleared at watermark 100, when clear_timer fired
# emit_timer fires at final watermark, but bag state should be empty at that point so no records should be
# emitted. but in practice 'value' does get emitted :(
assert StatefulDoFnTest.all_records == []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment