Skip to content

Instantly share code, notes, and snippets.

@jmarantz
Created May 5, 2021 22:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jmarantz/46059fac142b0d089174a23127b6e1b1 to your computer and use it in GitHub Desktop.
Save jmarantz/46059fac142b0d089174a23127b6e1b1 to your computer and use it in GitHub Desktop.
diff --git a/include/envoy/thread_local/thread_local.h b/include/envoy/thread_local/thread_local.h
index 8ff2ca99c8..f13be54cba 100644
--- a/include/envoy/thread_local/thread_local.h
+++ b/include/envoy/thread_local/thread_local.h
@@ -226,6 +226,8 @@ public:
* @return Event::Dispatcher& the thread local dispatcher.
*/
virtual Event::Dispatcher& dispatcher() PURE;
+
+ virtual bool isShutdown() const { return false; }
};
} // namespace ThreadLocal
diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc
index 46aed025cc..be9811a1ea 100644
--- a/source/common/stats/thread_local_store.cc
+++ b/source/common/stats/thread_local_store.cc
@@ -194,11 +194,15 @@ void ThreadLocalStoreImpl::initializeThreading(Event::Dispatcher& main_thread_di
tls_cache_ = ThreadLocal::TypedSlot<TlsCache>::makeUnique(tls);
tls_cache_->set(
[](Event::Dispatcher&) -> std::shared_ptr<TlsCache> { return std::make_shared<TlsCache>(); });
+ tls_ = tls;
}
void ThreadLocalStoreImpl::shutdownThreading() {
// This will block both future cache fills as well as cache flushes.
shutting_down_ = true;
+ if (tls_.has_value()) {
+ ASSERT(tls_->isShutdown());
+ }
Thread::LockGuard lock(hist_mutex_);
for (ParentHistogramImpl* histogram : histogram_set_) {
histogram->setShuttingDown(true);
diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h
index 946a84ed87..4516fbb826 100644
--- a/source/common/stats/thread_local_store.h
+++ b/source/common/stats/thread_local_store.h
@@ -499,6 +499,7 @@ private:
std::atomic<bool> shutting_down_{};
std::atomic<bool> merge_in_progress_{};
AllocatorImpl heap_allocator_;
+ OptRef<ThreadLocal::Instance> tls_;
NullCounterImpl null_counter_;
NullGaugeImpl null_gauge_;
diff --git a/source/common/thread_local/thread_local_impl.h b/source/common/thread_local/thread_local_impl.h
index db9cd63e10..c9df63b1e0 100644
--- a/source/common/thread_local/thread_local_impl.h
+++ b/source/common/thread_local/thread_local_impl.h
@@ -28,6 +28,7 @@ public:
void shutdownGlobalThreading() override;
void shutdownThread() override;
Event::Dispatcher& dispatcher() override;
+ bool isShutdown() const override { return shutdown_; }
private:
// On destruction returns the slot index to the deferred delete queue (detaches it). This allows
diff --git a/test/common/stats/thread_local_store_speed_test.cc b/test/common/stats/thread_local_store_speed_test.cc
index 5208fdaed9..a7e84f7343 100644
--- a/test/common/stats/thread_local_store_speed_test.cc
+++ b/test/common/stats/thread_local_store_speed_test.cc
@@ -37,9 +37,11 @@ public:
for (auto& stat_name_storage : stat_names_) {
stat_name_storage->free(symbol_table_);
}
- store_.shutdownThreading();
if (tls_) {
tls_->shutdownGlobalThreading();
+ }
+ store_.shutdownThreading();
+ if (tls_) {
tls_->shutdownThread();
}
if (dispatcher_) {
diff --git a/test/common/stats/thread_local_store_test.cc b/test/common/stats/thread_local_store_test.cc
index 6d8de94d04..412dfb9539 100644
--- a/test/common/stats/thread_local_store_test.cc
+++ b/test/common/stats/thread_local_store_test.cc
@@ -69,6 +69,10 @@ public:
store_->addSink(sink_);
}
+ ~StatsThreadLocalStoreTest() {
+ tls_.shutdownGlobalThreading();
+ }
+
void resetStoreWithAlloc(Allocator& alloc) {
store_ = std::make_unique<ThreadLocalStoreImpl>(alloc);
store_->addSink(sink_);
@@ -128,6 +132,7 @@ public:
}
void TearDown() override {
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
@@ -318,6 +323,7 @@ TEST_F(StatsThreadLocalStoreTest, Tls) {
EXPECT_EQ(&t1, store_->textReadouts().front().get()); // front() ok when size()==1
EXPECT_EQ(2UL, store_->textReadouts().front().use_count());
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
@@ -415,6 +421,7 @@ TEST_F(StatsThreadLocalStoreTest, BasicScope) {
Stats::Histogram::Unit::Unspecified));
}
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
scope1->deliverHistogramToSinks(h1, 100);
scope1->deliverHistogramToSinks(h2, 200);
@@ -460,6 +467,7 @@ TEST_F(StatsThreadLocalStoreTest, HistogramScopeOverlap) {
EXPECT_EQ(0, store_->histograms().size());
EXPECT_EQ(0, numTlsHistograms());
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
store_->histogramFromString("histogram_after_shutdown", Histogram::Unit::Unspecified);
@@ -476,6 +484,7 @@ TEST_F(StatsThreadLocalStoreTest, SanitizePrefix) {
Counter& c1 = scope1->counterFromString("c1");
EXPECT_EQ("scope1___foo.c1", c1.name());
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
@@ -506,6 +515,7 @@ TEST_F(StatsThreadLocalStoreTest, ScopeDelete) {
EXPECT_EQ(1L, c1.use_count());
c1.reset();
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
@@ -541,6 +551,7 @@ TEST_F(StatsThreadLocalStoreTest, NestedScopes) {
TextReadout& t1 = scope2->textReadoutFromString("some_string");
EXPECT_EQ("scope1.foo.some_string", t1.name());
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
@@ -605,6 +616,7 @@ TEST_F(StatsThreadLocalStoreTest, OverlappingScopes) {
EXPECT_EQ("abc", t2.value());
EXPECT_EQ(1UL, store_->textReadouts().size());
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
@@ -650,6 +662,7 @@ TEST_F(StatsThreadLocalStoreTest, TextReadoutAllLengths) {
t.set("");
EXPECT_EQ("", t.value());
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
@@ -805,6 +818,7 @@ TEST_F(StatsMatcherTLSTest, TestNoOpStatImpls) {
store_->histogramFromString("noop_histogram_2", Stats::Histogram::Unit::Unspecified);
EXPECT_EQ(&noop_histogram, &noop_histogram_2);
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
}
@@ -919,6 +933,7 @@ TEST_F(StatsMatcherTLSTest, TestExclusionRegex) {
EXPECT_EQ("", invalid_string_2.value());
// Expected to free lowercase_counter, lowercase_gauge, valid_counter, valid_gauge
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
}
@@ -938,6 +953,7 @@ public:
}
~RememberStatsMatcherTest() override {
+ tls_.shutdownGlobalThreading();
store_.shutdownThreading();
tls_.shutdownThread();
}
@@ -1111,6 +1127,7 @@ TEST_F(StatsThreadLocalStoreTest, RemoveRejectedStats) {
EXPECT_CALL(sink_, onHistogramComplete(Ref(histogram), 42));
histogram.recordValue(42);
textReadout.set("fortytwo");
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
@@ -1127,6 +1144,7 @@ TEST_F(StatsThreadLocalStoreTest, NonHotRestartNoTruncation) {
// This works fine, and we can find it by its long name because heap-stats do not
// get truncated.
EXPECT_NE(nullptr, TestUtility::findCounter(*store_, name_1).get());
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
@@ -1143,6 +1161,7 @@ protected:
~StatsThreadLocalStoreTestNoFixture() override {
if (threading_enabled_) {
+ tls_.shutdownGlobalThreading();
store_.shutdownThreading();
tls_.shutdownThread();
}
@@ -1189,6 +1208,7 @@ TEST_F(StatsThreadLocalStoreTest, ShuttingDown) {
store_->counterFromString("c1");
store_->gaugeFromString("g1", Gauge::ImportMode::Accumulate);
store_->textReadoutFromString("t1");
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
store_->counterFromString("c2");
store_->gaugeFromString("g2", Gauge::ImportMode::Accumulate);
@@ -1208,6 +1228,7 @@ TEST_F(StatsThreadLocalStoreTest, ShuttingDown) {
EXPECT_EQ(2L, TestUtility::findGauge(*store_, "g2").use_count());
EXPECT_EQ(2L, TestUtility::findTextReadout(*store_, "t2").use_count());
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
@@ -1222,6 +1243,7 @@ TEST_F(StatsThreadLocalStoreTest, MergeDuringShutDown) {
EXPECT_CALL(sink_, onHistogramComplete(Ref(h1), 1));
h1.recordValue(1);
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
// Validate that merge callback is called during shutdown and there is no ASSERT.
@@ -1229,6 +1251,7 @@ TEST_F(StatsThreadLocalStoreTest, MergeDuringShutDown) {
store_->mergeHistograms([&merge_called]() -> void { merge_called = true; });
EXPECT_TRUE(merge_called);
+ tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
@@ -1243,6 +1266,7 @@ TEST(ThreadLocalStoreThreadTest, ConstructDestruct) {
store.initializeThreading(*dispatcher, tls);
{ ScopePtr scope1 = store.createScope("scope1."); }
+ tls.shutdownGlobalThreading();
store.shutdownThreading();
}
@@ -1502,15 +1526,7 @@ protected:
}
~ThreadLocalRealThreadsTestBase() override {
- {
- BlockingBarrier blocking_barrier(1);
- main_dispatcher_->post(blocking_barrier.run([this]() {
- store_->shutdownThreading();
- tls_->shutdownGlobalThreading();
- tls_->shutdownThread();
- }));
- }
-
+ shutdownThreading();
for (Event::DispatcherPtr& dispatcher : thread_dispatchers_) {
dispatcher->post([&dispatcher]() { dispatcher->exit(); });
}
@@ -1527,6 +1543,17 @@ protected:
main_thread_->join();
}
+ void shutdownThreading() {
+ BlockingBarrier blocking_barrier(1);
+ main_dispatcher_->post(blocking_barrier.run([this]() {
+ if (!tls_->isShutdown()) {
+ tls_->shutdownGlobalThreading();
+ }
+ store_->shutdownThreading();
+ tls_->shutdownThread();
+ }));
+ }
+
void workerThreadFn(uint32_t thread_index, BlockingBarrier& blocking_barrier) {
thread_dispatchers_[thread_index] =
api_->allocateDispatcher(absl::StrCat("test_worker_", thread_index));
@@ -1766,8 +1793,7 @@ TEST_F(HistogramThreadTest, ScopeOverlap) {
EXPECT_EQ(0, store_->histograms().size());
EXPECT_EQ(0, numTlsHistograms());
- store_->shutdownThreading();
-
+ shutdownThreading();
store_->histogramFromString("histogram_after_shutdown", Histogram::Unit::Unspecified);
}
diff --git a/test/mocks/thread_local/mocks.h b/test/mocks/thread_local/mocks.h
index e735c543b0..6308256225 100644
--- a/test/mocks/thread_local/mocks.h
+++ b/test/mocks/thread_local/mocks.h
@@ -22,9 +22,12 @@ public:
// Server::ThreadLocal
MOCK_METHOD(SlotPtr, allocateSlot, ());
MOCK_METHOD(void, registerThread, (Event::Dispatcher & dispatcher, bool main_thread));
- MOCK_METHOD(void, shutdownGlobalThreading, ());
+ void shutdownGlobalThreading() override {
+ shutdown_ = true;
+ }
MOCK_METHOD(void, shutdownThread, ());
MOCK_METHOD(Event::Dispatcher&, dispatcher, ());
+ bool isShutdown() const override { return shutdown_; }
SlotPtr allocateSlot_() { return SlotPtr{new SlotImpl(*this, current_slot_++)}; }
void runOnAllThreads1_(Event::PostCb cb) { cb(); }
diff --git a/test/server/admin/stats_handler_test.cc b/test/server/admin/stats_handler_test.cc
index 1059d642ed..ae3c11e422 100644
--- a/test/server/admin/stats_handler_test.cc
+++ b/test/server/admin/stats_handler_test.cc
@@ -33,6 +33,12 @@ public:
true /*pretty_print*/);
}
+ void shutdownThreading() {
+ tls_.shutdownGlobalThreading();
+ store_->shutdownThreading();
+ tls_.shutdownThread();
+ }
+
Stats::SymbolTableImpl symbol_table_;
NiceMock<Event::MockDispatcher> main_thread_dispatcher_;
NiceMock<ThreadLocal::MockInstance> tls_;
@@ -189,7 +195,8 @@ TEST_P(AdminStatsTest, StatsAsJson) {
})EOF";
EXPECT_THAT(expected_json, JsonStringEq(actual_json));
- store_->shutdownThreading();
+ shutdownThreading();
+ ENVOY_LOG_MISC(error, "end of StatsAsJson");
}
TEST_P(AdminStatsTest, UsedOnlyStatsAsJson) {
@@ -289,7 +296,7 @@ TEST_P(AdminStatsTest, UsedOnlyStatsAsJson) {
})EOF";
EXPECT_THAT(expected_json, JsonStringEq(actual_json));
- store_->shutdownThreading();
+ shutdownThreading();
}
TEST_P(AdminStatsTest, StatsAsJsonFilterString) {
@@ -391,7 +398,7 @@ TEST_P(AdminStatsTest, StatsAsJsonFilterString) {
})EOF";
EXPECT_THAT(expected_json, JsonStringEq(actual_json));
- store_->shutdownThreading();
+ shutdownThreading();
}
TEST_P(AdminStatsTest, UsedOnlyStatsAsJsonFilterString) {
@@ -502,7 +509,7 @@ TEST_P(AdminStatsTest, UsedOnlyStatsAsJsonFilterString) {
})EOF";
EXPECT_THAT(expected_json, JsonStringEq(actual_json));
- store_->shutdownThreading();
+ shutdownThreading();
}
INSTANTIATE_TEST_SUITE_P(IpVersions, AdminInstanceTest,
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment