Skip to content

Instantly share code, notes, and snippets.

@toddlipcon
Created August 16, 2016 04:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save toddlipcon/c150de547f7a6b8462eeaeb568efdae9 to your computer and use it in GitHub Desktop.
Save toddlipcon/c150de547f7a6b8462eeaeb568efdae9 to your computer and use it in GitHub Desktop.
commit 93623f87c137cdedb69bfe171b82d17aede185f3
Author: dralves <dralves@apache.org>
Date: Mon Jul 11 14:01:35 2016 -0700
Add time/watermark based garbage collection to ResultTracker
This adds time and watermark based garbage collection to the
ResultTracker. Regarding time GC, there are two ttl's, a client ttl and
a response ttl.
After the response ttl has elapsed, we garbage collect responses
but the ResultTracker remembers that it doesn't know them, so if
the client retries a request older than that it gets a meaningful
error back, stating that the request is stale.
After the client ttl period without hearing back from a client,
we GC the client state entirely, meaning all requests from that
client will be treated as new.
Regarding watermark GC the algorithm is simple, we trust the client to
tell us what's its lowest incomplete sequence number and we GC
everything below that.
This adds a simple test that makes sure this basically works, and adds a
multithreaded test that runs GC at the same time as writes.
NOTE: this does not wire the time-based garbage collection process into
the server itself -- it's currently only triggered by the included
tests.
Original patch by David.
Some changes by Todd.
Change-Id: I2c8e7b7191ca14842a31b64813ed498bdf626fa8
diff --git a/src/kudu/rpc/rpc-stress-test.cc b/src/kudu/rpc/exactly_once_rpc-test.cc
similarity index 59%
rename from src/kudu/rpc/rpc-stress-test.cc
rename to src/kudu/rpc/exactly_once_rpc-test.cc
index 2d1215e..3f3f59e 100644
--- a/src/kudu/rpc/rpc-stress-test.cc
+++ b/src/kudu/rpc/exactly_once_rpc-test.cc
@@ -19,6 +19,9 @@
#include "kudu/rpc/rpc.h"
#include "kudu/rpc/rpc-test-base.h"
+DECLARE_int64(remember_clients_ttl_ms);
+DECLARE_int64(remember_responses_ttl_ms);
+
using std::atomic_int;
using std::shared_ptr;
using std::unique_ptr;
@@ -58,6 +61,8 @@ class TestServerPicker : public ServerPicker<CalculatorServiceProxy> {
CalculatorServiceProxy* proxy_;
};
+} // anonymous namespace
+
class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
ExactlyOnceRequestPB,
ExactlyOnceResponsePB> {
@@ -67,10 +72,12 @@ class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
const MonoTime& deadline,
const shared_ptr<Messenger>& messenger,
int value,
- CountDownLatch* latch)
+ CountDownLatch* latch,
+ int server_sleep = 0)
: RetriableRpc(server_picker, request_tracker, deadline, messenger), latch_(latch) {
req_.set_value_to_add(value);
req_.set_randomly_fail(true);
+ req_.set_sleep_for_ms(server_sleep);
}
void Try(CalculatorServiceProxy* server, const ResponseCallback& callback) override {
@@ -84,18 +91,21 @@ class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
// We shouldn't get errors from the server/rpc system since we set a high timeout.
CHECK_OK(rpc_cb_status);
- RetriableRpcStatus status;
if (!mutable_retrier()->controller().status().ok()) {
CHECK(mutable_retrier()->controller().status().IsRemoteError());
- status.result = RetriableRpcStatus::SERVER_BUSY;
- return status;
+ if (mutable_retrier()->controller().error_response()->code()
+ == ErrorStatusPB::ERROR_REQUEST_STALE) {
+ return { RetriableRpcStatus::NON_RETRIABLE_ERROR,
+ mutable_retrier()->controller().status() };
+ } else {
+ return { RetriableRpcStatus::SERVER_BUSY, mutable_retrier()->controller().status() };
+ }
}
// If the controller is not finished we're in the ReplicaFoundCb() callback.
// Return ok to proceed with the call to the server.
if (!mutable_retrier()->mutable_controller()->finished()) {
- status.result = RetriableRpcStatus::OK;
- return status;
+ return { RetriableRpcStatus::OK, Status::OK() };
}
// If we've received a response in the past, all following responses must
@@ -106,18 +116,20 @@ class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
CHECK_EQ(successful_response_.DebugString(), resp_.DebugString());
}
- // Still report errors, with some probability. This will cause requests to
- // be retried. Since the requests were originally successful we should get
- // the same reply back.
- int random = rand() % 4;
- switch (random) {
- case 0: status.result = RetriableRpcStatus::SERVER_BUSY; break;
- case 1: status.result = RetriableRpcStatus::RESOURCE_NOT_FOUND; break;
- case 2: status.result = RetriableRpcStatus::SERVER_NOT_ACCESSIBLE; break;
- case 3: status.result = RetriableRpcStatus::OK; break;
- default: LOG(FATAL) << "Unexpected value";
+ if (sometimes_retry_successful_) {
+ // Still report errors, with some probability. This will cause requests to
+ // be retried. Since the requests were originally successful we should get
+ // the same reply back.
+ int random = rand() % 4;
+ switch (random) {
+ case 0: return { RetriableRpcStatus::SERVER_BUSY, Status::RemoteError("") };
+ case 1: return { RetriableRpcStatus::RESOURCE_NOT_FOUND, Status::RemoteError("") };
+ case 2: return { RetriableRpcStatus::SERVER_NOT_ACCESSIBLE, Status::RemoteError("") };
+ case 3: return { RetriableRpcStatus::OK, Status::OK() };
+ default: LOG(FATAL) << "Unexpected value";
+ }
}
- return status;
+ return { RetriableRpcStatus::OK, Status::OK() };
}
void Finish(const Status& status) override {
@@ -129,14 +141,17 @@ class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
std::string ToString() const override { return "test-rpc"; }
CountDownLatch* latch_;
ExactlyOnceResponsePB successful_response_;
+ bool sometimes_retry_successful_ = true;
};
-} // namespace
-
-class RpcStressTest : public RpcTestBase {
+class ExactlyOnceRpcTest : public RpcTestBase {
public:
void SetUp() override {
RpcTestBase::SetUp();
+ SeedRandom();
+ }
+
+ void StartServer() {
// Set up server.
StartTestServerWithGeneratedCode(&server_addr_);
client_messenger_ = CreateMessenger("Client");
@@ -151,7 +166,8 @@ class RpcStressTest : public RpcTestBase {
RetriableRpcExactlyOnceAdder(const scoped_refptr<TestServerPicker>& server_picker,
const scoped_refptr<RequestTracker>& request_tracker,
const shared_ptr<Messenger>& messenger,
- int value) : latch_(1) {
+ int value,
+ int server_sleep = 0) : latch_(1) {
MonoTime now = MonoTime::Now();
now.AddDelta(MonoDelta::FromMilliseconds(10000));
rpc_ = new CalculatorServiceRpc(server_picker,
@@ -231,6 +247,93 @@ class RpcStressTest : public RpcTestBase {
request_tracker_->RpcCompleted(seq_no);
}
+ // Continuously runs GC on the ResultTracker.
+ void RunGcThread(MonoDelta run_for) {
+ MonoTime run_until = MonoTime::Now();
+ run_until.AddDelta(run_for);
+ while (MonoTime::Now().ComesBefore(run_until)) {
+ result_tracker_->GCResults();
+ SleepFor(MonoDelta::FromMilliseconds(rand() % 10));
+ }
+ }
+
+
+ // This continuously issues calls to the server, that often last longer than
+ // 'remember_responses_ttl_ms', making sure that we don't get errors back.
+ void DoLongWritesThread(MonoDelta run_for) {
+ MonoTime run_until = MonoTime::Now();
+ run_until.AddDelta(run_for);
+ int counter = 0;
+ while (MonoTime::Now().ComesBefore(run_until)) {
+ unique_ptr<RetriableRpcExactlyOnceAdder> adder(new RetriableRpcExactlyOnceAdder(
+ test_picker_, request_tracker_, client_messenger_, 1,
+ rand() % (2 * FLAGS_remember_responses_ttl_ms)));
+
+ // This thread is used in the stress test where we're constantly running GC.
+ // So, once we get a "success" response, it's likely that the result will be
+ // GCed on the server side, and thus it's not safe to spuriously retry.
+ adder->rpc_->sometimes_retry_successful_ = false;
+ adder->SleepAndSend();
+ SleepFor(MonoDelta::FromMilliseconds(rand() % 10));
+ counter++;
+ }
+ ExactlyOnceResponsePB response;
+ ResultTracker::SequenceNumber sequence_number;
+ CHECK_OK(request_tracker_->NewSeqNo(&sequence_number));
+ CHECK_OK(MakeAddCall(sequence_number, 0, &response));
+ CHECK_EQ(response.current_val(), counter);
+ request_tracker_->RpcCompleted(sequence_number);
+ }
+
+ // Stubbornly sends the same request to the server, this should observe three states.
+ // The request should be successful at first, then its result should be GCed and the
+ // client should be GCed.
+ void StubbornlyWriteTheSameRequestThread(MonoDelta run_for) {
+ MonoTime run_until = MonoTime::Now();
+ run_until.AddDelta(run_for);
+ // Make an initial request, so that we get a response to compare to.
+ ResultTracker::SequenceNumber sequence_number;
+ CHECK_OK(request_tracker_->NewSeqNo(&sequence_number));
+ ExactlyOnceResponsePB original_response;
+ CHECK_OK(MakeAddCall(sequence_number, 0, &original_response));
+
+ // Now repeat the same request. At first we should get the same response, then the result
+ // should be GCed and we should get STALE back. Finally the request should succeed again
+ // but we should get a new response.
+ bool result_gced = false;
+ bool client_gced = false;
+ while (MonoTime::Now().ComesBefore(run_until)) {
+ ExactlyOnceResponsePB response;
+ Status s = MakeAddCall(sequence_number, 0, &response);
+ if (s.ok()) {
+ if (!result_gced) {
+ CHECK_EQ(response.ShortDebugString(), original_response.ShortDebugString());
+ } else {
+ client_gced = true;
+ CHECK_NE(response.ShortDebugString(), original_response.ShortDebugString());
+ }
+ SleepFor(MonoDelta::FromMilliseconds(rand() % 10));
+ } else if (s.IsRemoteError()) {
+ result_gced = true;
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_clients_ttl_ms * 2));
+ }
+ }
+ CHECK(result_gced);
+ CHECK(client_gced);
+ }
+
+ Status MakeAddCall(ResultTracker::SequenceNumber sequence_number,
+ int value_to_add,
+ ExactlyOnceResponsePB* response,
+ int attempt_no = -1) {
+ RpcController controller;
+ ExactlyOnceRequestPB req;
+ req.set_value_to_add(value_to_add);
+ if (attempt_no == -1) attempt_no = attempt_nos_.fetch_add(1);
+ AddRequestId(&controller, kClientId, sequence_number, attempt_no);
+ Status s = proxy_->AddExactlyOnce(req, response, &controller);
+ return s;
+ }
protected:
Sockaddr server_addr_;
@@ -242,8 +345,9 @@ class RpcStressTest : public RpcTestBase {
};
// Tests that we get exactly once semantics on RPCs when we send a bunch of requests with the
-// same sequence number as previous request.
-TEST_F(RpcStressTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
+// same sequence number as previous requests.
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
+ StartServer();
ExactlyOnceResponsePB original_resp;
int mem_consumption = mem_tracker_->consumption();
{
@@ -312,7 +416,8 @@ TEST_F(RpcStressTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
// the server side is instructed to spuriously fail attempts.
// In CalculatorServiceRpc we sure that the same response is returned by all retries and,
// after all the rpcs are done, we make sure that final result is the expected one.
-TEST_F(RpcStressTest, TestExactlyOnceSemanticsWithReplicatedRpc) {
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithReplicatedRpc) {
+ StartServer();
int kNumIterations = 10;
int kNumRpcs = 10;
@@ -341,7 +446,8 @@ TEST_F(RpcStressTest, TestExactlyOnceSemanticsWithReplicatedRpc) {
// Performs a series of requests in which each single request is attempted by multiple threads.
// On each iteration, after all the threads complete, we expect that the add operation was
// executed exactly once.
-TEST_F(RpcStressTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) {
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) {
+ StartServer();
int kNumIterations = 10;
int kNumThreads = 10;
@@ -355,15 +461,8 @@ TEST_F(RpcStressTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) {
int single_response_size = 0;
// Measure memory consumption for a single response from the same client.
- {
- RpcController controller;
- ExactlyOnceRequestPB req;
- ExactlyOnceResponsePB resp;
- req.set_value_to_add(1);
- AddRequestId(&controller, kClientId, sequence_number, 0);
- ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
- single_response_size = resp.SpaceUsed();
- }
+ ExactlyOnceResponsePB resp;
+ ASSERT_OK(MakeAddCall(sequence_number, 1, &resp));
for (int i = 1; i <= kNumIterations; i ++) {
vector<unique_ptr<SimultaneousExactlyOnceAdder>> adders;
@@ -398,5 +497,90 @@ TEST_F(RpcStressTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) {
}
}
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollection) {
+ FLAGS_remember_clients_ttl_ms = 1000;
+ FLAGS_remember_responses_ttl_ms = 100;
+
+ StartServer();
+
+ // Make a request.
+ ExactlyOnceResponsePB original;
+ ResultTracker::SequenceNumber sequence_number = 0;
+ ASSERT_OK(MakeAddCall(sequence_number, 1, &original));
+
+ // Making the same request again, should return the same response.
+ ExactlyOnceResponsePB resp;
+ ASSERT_OK(MakeAddCall(sequence_number, 1, &resp));
+ ASSERT_EQ(original.ShortDebugString(), resp.ShortDebugString());
+
+ // Now sleep for 'remember_responses_ttl_ms' and run GC, we should then
+ // get a STALE back.
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_responses_ttl_ms));
+ int64_t memory_consumption = mem_tracker_->consumption();
+ result_tracker_->GCResults();
+ ASSERT_LT(mem_tracker_->consumption(), memory_consumption);
+
+ resp.Clear();
+ Status s = MakeAddCall(sequence_number, 1, &resp);
+ ASSERT_TRUE(s.IsRemoteError());
+ ASSERT_STR_CONTAINS(s.ToString(), "is stale");
+
+ // Sleep again, this time for 'remember_clients_ttl_ms' and run GC again.
+ // The request should be successful, but its response should be a new one.
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_clients_ttl_ms));
+ memory_consumption = mem_tracker_->consumption();
+ result_tracker_->GCResults();
+ ASSERT_LT(mem_tracker_->consumption(), memory_consumption);
+
+ resp.Clear();
+ ASSERT_OK(MakeAddCall(sequence_number, 1, &resp));
+ ASSERT_NE(resp.ShortDebugString(), original.ShortDebugString());
+}
+
+// This test creates a thread continuously making requests to the server, some lasting longer
+// than the GC period, at the same time it runs GC, making sure that the corresponding
+// CompletionRecords/ClientStates are not deleted from underneath the ongoing requests.
+// This also creates a thread that runs GC very frequently and another thread that sends the
+// same request over and over and observes the possible states: request is ok, request is stale
+// request is ok again (because the client was forgotten).
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest) {
+ FLAGS_remember_clients_ttl_ms = 100;
+ FLAGS_remember_responses_ttl_ms = 10;
+
+ StartServer();
+
+ // The write thread runs for the shortest period to make sure client GC has a
+ // chance to run.
+ MonoDelta writes_run_for = MonoDelta::FromSeconds(2);
+ MonoDelta stubborn_run_for = MonoDelta::FromSeconds(3);
+ // GC runs for the longest period because the stubborn thread may wait beyond its deadline
+ // to wait on client GC.
+ MonoDelta gc_run_for = MonoDelta::FromSeconds(4);
+ if (AllowSlowTests()) {
+ writes_run_for = MonoDelta::FromSeconds(10);
+ stubborn_run_for = MonoDelta::FromSeconds(11);
+ gc_run_for = MonoDelta::FromSeconds(12);
+ }
+
+ scoped_refptr<kudu::Thread> gc_thread;
+ scoped_refptr<kudu::Thread> write_thread;
+ scoped_refptr<kudu::Thread> stubborn_thread;
+ CHECK_OK(kudu::Thread::Create(
+ "gc", "gc", &ExactlyOnceRpcTest::RunGcThread, this, gc_run_for, &gc_thread));
+ CHECK_OK(kudu::Thread::Create(
+ "write", "write", &ExactlyOnceRpcTest::DoLongWritesThread,
+ this, writes_run_for, &write_thread));
+ CHECK_OK(kudu::Thread::Create(
+ "stubborn", "stubborn", &ExactlyOnceRpcTest::StubbornlyWriteTheSameRequestThread,
+ this, stubborn_run_for, &stubborn_thread));
+
+ gc_thread->Join();
+ write_thread->Join();
+ stubborn_thread->Join();
+
+ result_tracker_->GCResults();
+ ASSERT_EQ(0, mem_tracker_->consumption());
+}
+
} // namespace rpc
} // namespace kudu
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment