Skip to content

Instantly share code, notes, and snippets.

@coryan
Last active March 11, 2019 17:15
Show Gist options
  • Save coryan/52d4bc27df30baf7ce28a8d4969c25aa to your computer and use it in GitHub Desktop.
Save coryan/52d4bc27df30baf7ce28a8d4969c25aa to your computer and use it in GitHub Desktop.
Demonstrates how gRPC alarms are not synchronized.
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpcpp/alarm.h>
#include <grpcpp/grpcpp.h>
#include <atomic>
#include <cstdint>
#include <map>
#include <thread>
int counter = 0;
class Op {
public:
virtual void Notify() = 0;
virtual void Cancel() = 0;
};
class TimerOp : public Op {
public:
TimerOp()
: alarm_(new grpc::Alarm) {}
void Set(grpc::CompletionQueue* cq, std::chrono::system_clock::time_point tp, void *tag) {
alarm_->Set(cq, tp, tag);
}
void Notify() override {
alarm_ = nullptr; // FIX1: remove this to fix the crash.
++counter;
}
void Cancel() override {
alarm_->Cancel();
}
private:
std::unique_ptr<grpc::Alarm> alarm_; // FIX2: protect this with a mutex
};
class Wrapper {
public:
Wrapper() : is_shutdown_(false) {}
void Run() {
while (!is_shutdown_.load()) {
void* tag;
bool ok;
if (!cq_.Next(&tag, &ok)) {
return;
}
auto id = reinterpret_cast<std::intptr_t>(tag);
std::shared_ptr<Op> op;
{
std::unique_lock<std::mutex> lk(mu_);
auto f = pending_ops_.find(id);
if (f == pending_ops_.end()) {
return;
}
op = f->second;
pending_ops_.erase(f);
}
op->Notify();
}
}
void Shutdown() {
is_shutdown_.store(true);
cq_.Shutdown();
}
std::shared_ptr<Op> SetDeadline(std::chrono::system_clock::time_point deadline) {
auto op = std::make_shared<TimerOp>();
void* tag = Register(op);
op->Set(grpc_cq(), deadline, tag);
return op;
}
grpc::CompletionQueue* grpc_cq() { return &cq_; }
private:
void* Register(std::shared_ptr<Op> op) {
void* tag = op.get();
std::unique_lock<std::mutex> lk(mu_);
pending_ops_[reinterpret_cast<std::intptr_t>(tag)] = op;
return tag;
}
grpc::CompletionQueue cq_;
std::atomic<bool> is_shutdown_;
std::mutex mu_;
std::map<std::intptr_t, std::shared_ptr<Op>> pending_ops_; // GUARDED_BY(mu_);
};
int main() {
Wrapper cq;
std::vector<std::thread> threads;
for (std::size_t i = 0; i != 2 * std::thread::hardware_concurrency(); ++i) {
threads.emplace_back(std::thread([&cq] { cq.Run(); }));
}
for (int i = 0; i != 1000; ++i) {
auto op = cq.SetDeadline(std::chrono::system_clock::now());
op->Cancel();
}
cq.Shutdown();
for (auto& t : threads) {
t.join();
}
std::cout << "Counter = " << counter << "\n";
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment