Created
August 2, 2019 12:38
-
-
Save plamut/8f996fdc9113e8de2b2c050befb36ff6 to your computer and use it in GitHub Desktop.
Unit test to catch a race condition in BackgroundConsumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 86fc0b465d27905be9d502aa2978e56ba135a53d Mon Sep 17 00:00:00 2001 | |
From: Peter Lamut <inbox@peterlamut.com> | |
Date: Fri, 2 Aug 2019 14:30:20 +0200 | |
Subject: [PATCH] Add unit test for race in BackgroundConsumer | |
--- | |
api_core/tests/unit/test_bidi.py | 51 ++++++++++++++++++++++++++++++++ | |
1 file changed, 51 insertions(+) | |
diff --git a/api_core/tests/unit/test_bidi.py b/api_core/tests/unit/test_bidi.py | |
index 4d185d3158e..73d74cc8519 100644 | |
--- a/api_core/tests/unit/test_bidi.py | |
+++ b/api_core/tests/unit/test_bidi.py | |
@@ -14,7 +14,9 @@ | |
import datetime | |
import logging | |
+import random | |
import threading | |
+import time | |
import grpc | |
import mock | |
@@ -692,6 +694,55 @@ class TestBackgroundConsumer(object): | |
bidi_rpc.close.assert_called_once() | |
assert consumer.is_active is False | |
+ def test_pause_recv_correctly_synchronized(self): | |
+ bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True) | |
+ bidi_rpc.is_active = True | |
+ | |
+ issue_detected_event = threading.Event() | |
+ terminate_pause_resume = threading.Event() | |
+ | |
+ def recv_side_effect(): | |
+ time.sleep(0.1) # it takes time to receive something | |
+ if consumer.is_paused: | |
+ print("\x1b[1;36m recv() ISSUE detected\x1b[0m", flush=True) | |
+ issue_detected_event.set() | |
+ raise RuntimeError("Receiving in paused state") # will make consumer exit | |
+ | |
+ print(f"\x1b[36m recv() no issue detected so far\x1b[0m", flush=True) | |
+ return mock.sentinel.response_foo | |
+ | |
+ bidi_rpc.recv.side_effect = recv_side_effect | |
+ | |
+ # now create consumer and pause/resume it | |
+ consumer = bidi.BackgroundConsumer(bidi_rpc, on_response=mock.Mock()) | |
+ consumer.start() | |
+ | |
+ # start another thread that randomly pauses/resumes the consumer | |
+ def pause_resume_consumer(): | |
+ while True and not terminate_pause_resume.is_set(): | |
+ consumer.pause() | |
+ time.sleep(random.random() / 10) | |
+ consumer.resume() | |
+ time.sleep(random.random() + 1) | |
+ | |
+ pause_resume_thread = threading.Thread( | |
+ target=pause_resume_consumer, name="pause-resume-thread" | |
+ ) | |
+ pause_resume_thread.start() | |
+ | |
+ # wait up to 10 seconds for the issue to occur | |
+ issue_detected = issue_detected_event.wait(timeout=10) | |
+ terminate_pause_resume.set() | |
+ | |
+ # make the consumer exit | |
+ bidi_rpc.is_active = False | |
+ consumer._on_call_done(bidi_rpc) | |
+ while consumer.is_active: | |
+ pass # It may take a few cycles for the thread to exit. | |
+ | |
+ if issue_detected: | |
+ pytest.fail("BiDi recv() should not be called when consumer is paused.") | |
+ | |
def test_pause_resume_and_close(self): | |
# This test is relatively complex. It attempts to start the consumer, | |
# consume one item, pause the consumer, check the state of the world, | |
-- | |
2.17.1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment