Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save plamut/8f996fdc9113e8de2b2c050befb36ff6 to your computer and use it in GitHub Desktop.
Save plamut/8f996fdc9113e8de2b2c050befb36ff6 to your computer and use it in GitHub Desktop.
Unit test to catch a race condition in BackgroundConsumer
From 86fc0b465d27905be9d502aa2978e56ba135a53d Mon Sep 17 00:00:00 2001
From: Peter Lamut <inbox@peterlamut.com>
Date: Fri, 2 Aug 2019 14:30:20 +0200
Subject: [PATCH] Add unit test for race in BackgroundConsumer
---
api_core/tests/unit/test_bidi.py | 51 ++++++++++++++++++++++++++++++++
1 file changed, 51 insertions(+)
diff --git a/api_core/tests/unit/test_bidi.py b/api_core/tests/unit/test_bidi.py
index 4d185d3158e..73d74cc8519 100644
--- a/api_core/tests/unit/test_bidi.py
+++ b/api_core/tests/unit/test_bidi.py
@@ -14,7 +14,9 @@
import datetime
import logging
+import random
import threading
+import time
import grpc
import mock
@@ -692,6 +694,55 @@ class TestBackgroundConsumer(object):
bidi_rpc.close.assert_called_once()
assert consumer.is_active is False
+ def test_pause_recv_correctly_synchronized(self):
+ bidi_rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
+ bidi_rpc.is_active = True
+
+ issue_detected_event = threading.Event()
+ terminate_pause_resume = threading.Event()
+
+ def recv_side_effect():
+ time.sleep(0.1) # it takes time to receive something
+ if consumer.is_paused:
+ print("\x1b[1;36m recv() ISSUE detected\x1b[0m", flush=True)
+ issue_detected_event.set()
+ raise RuntimeError("Receiving in paused state") # will make consumer exit
+
+ print(f"\x1b[36m recv() no issue detected so far\x1b[0m", flush=True)
+ return mock.sentinel.response_foo
+
+ bidi_rpc.recv.side_effect = recv_side_effect
+
+ # now create consumer and pause/resume it
+ consumer = bidi.BackgroundConsumer(bidi_rpc, on_response=mock.Mock())
+ consumer.start()
+
+ # start another thread that randomly pauses/resumes the consumer
+ def pause_resume_consumer():
+ while True and not terminate_pause_resume.is_set():
+ consumer.pause()
+ time.sleep(random.random() / 10)
+ consumer.resume()
+ time.sleep(random.random() + 1)
+
+ pause_resume_thread = threading.Thread(
+ target=pause_resume_consumer, name="pause-resume-thread"
+ )
+ pause_resume_thread.start()
+
+ # wait up to 10 seconds for the issue to occur
+ issue_detected = issue_detected_event.wait(timeout=10)
+ terminate_pause_resume.set()
+
+ # make the consumer exit
+ bidi_rpc.is_active = False
+ consumer._on_call_done(bidi_rpc)
+ while consumer.is_active:
+ pass # It may take a few cycles for the thread to exit.
+
+ if issue_detected:
+ pytest.fail("BiDi recv() should not be called when consumer is paused.")
+
def test_pause_resume_and_close(self):
# This test is relatively complex. It attempts to start the consumer,
# consume one item, pause the consumer, check the state of the world,
--
2.17.1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment