Skip to content

Instantly share code, notes, and snippets.

@ppLorins
Created October 21, 2019 08:12
Show Gist options
  • Save ppLorins/942874631457a7f982411ea0f2b60048 to your computer and use it in GitHub Desktop.
Save ppLorins/942874631457a7f982411ea0f2b60048 to your computer and use it in GitHub Desktop.
grpc broadcasting pattern memory leak(c++)
#include <memory>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <list>
#include <random>
#include <atomic>
#include <grpc++/grpc++.h>
#include <grpc/support/log.h>
#include "helloworld.grpc.pb.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientAsyncReaderWriter;
using grpc::ClientAsyncResponseReader;
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerAsyncReaderWriter;
using grpc::ServerCompletionQueue;
using grpc::CompletionQueue;
using grpc::Status;
using grpc::StatusCode;
using helloworld::HelloRequest;
using helloworld::HelloReply;
using helloworld::Greeter;
int g_thread_num = 1;
int g_cq_num = 1;
int g_ins_pool = 1;
int g_channel_pool = 1;
CompletionQueue *g_client_cq;
std::atomic<void*>** g_instance_pool = nullptr;
typedef std::shared_ptr<::grpc::Channel> ShpChannel;
typedef std::vector<ShpChannel> VecChannel;
typedef std::unique_ptr<VecChannel> UptrVecChannel;
std::vector<std::string> g_vec_backends{"192.168.0.100:50052","192.168.0.100:50053"};
struct ChannelPool {
ChannelPool() {
for (const auto &_svr : g_vec_backends) {
this->m_backend_pool.insert(std::pair<std::string, UptrVecChannel>(_svr,UptrVecChannel(new VecChannel())));
for (int i = 0; i < g_channel_pool; ++i)
this->m_backend_pool[_svr]->emplace_back(grpc::CreateChannel(_svr, grpc::InsecureChannelCredentials()));
}
}
ShpChannel PickupOneChannel(const std::string &svr) {
if (m_backend_pool.find(svr) == m_backend_pool.cend())
return ShpChannel();
auto &_uptr_vec = m_backend_pool[svr];
return _uptr_vec->operator[](this->GenerateRandom(0, g_channel_pool-1));
}
uint32_t GenerateRandom(uint32_t from, uint32_t to) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<unsigned long> dis(from,to);
return dis(gen);
}
//Read only after initialized.
std::map<std::string, UptrVecChannel> m_backend_pool;
} *g_channel;
class CallDataBase {
public:
CallDataBase() {}
virtual void Proceed(bool ok) = 0;
};
class CallDataServerBase :public CallDataBase {
public:
CallDataServerBase(Greeter::AsyncService* service, ServerCompletionQueue* cq) : service_(service), cq_(cq){
}
protected:
Greeter::AsyncService* service_;
ServerCompletionQueue* cq_;
ServerContext ctx_;
HelloRequest request_;
HelloReply reply_;
};
class CallDataUnary;
class AsyncUnaryGreeterClient : public CallDataBase{
enum class ClientStatus {
PROCESS = 1,
FINISH = 2
};
public:
AsyncUnaryGreeterClient(std::shared_ptr<Channel> channel, CompletionQueue *cq, CallDataUnary* pcd);
~AsyncUnaryGreeterClient();
void AsyncSayHello(const std::string& user);
void Proceed(bool ok);
private:
ClientContext context_;
ClientStatus status_;
std::unique_ptr<Greeter::Stub> stub_;
std::unique_ptr<ClientAsyncResponseReader<HelloReply>> stream_;
CompletionQueue *cq_;
HelloRequest request_;
HelloReply response_;
grpc::Status finish_status_ = grpc::Status::OK;
CallDataUnary* m_parent_call_data = nullptr;
};
class CallDataUnary : CallDataServerBase {
public:
CallDataUnary(Greeter::AsyncService* service, ServerCompletionQueue* cq) : CallDataServerBase(service,cq),responder_(&ctx_), status_(CREATE) {
for (const auto &_svr : g_vec_backends) {
auto _shp_channel = g_channel->PickupOneChannel(_svr);
m_backends.emplace_back(new AsyncUnaryGreeterClient(_shp_channel, g_client_cq, this));
}
status_ = PROCESS;
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this);
this->m_done_counter.store(0);
//std::cout << "pos_1_1" << std::endl;
}
~CallDataUnary() {
//std::cout << "pos_1_2" << std::endl;
}
void NotifyOneDone() {
return ;
int _pre = m_done_counter.fetch_add(1);
if (_pre+1 == g_vec_backends.size()) {
//reply_.set_message(this->request_.name());
//status_ = FINISH;
//responder_.Finish(reply_, Status::OK, this);
//delete this;
//for (auto* _p_svr : m_backends)
// delete _p_svr;
}
}
void Proceed(bool ok) {
if (status_ == PROCESS) {
new CallDataUnary(service_, cq_);
for (auto* _p_svr : m_backends)
_p_svr->AsyncSayHello(request_.name());
reply_.set_message(this->request_.name());
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
} else {
GPR_ASSERT(status_ == FINISH);
delete this;
}
}
private:
std::list<AsyncUnaryGreeterClient*> m_backends;
std::atomic<int> m_done_counter;
ServerAsyncResponseWriter<HelloReply> responder_;
enum CallStatus { CREATE = 1, PROCESS, FINISH };
CallStatus status_;
};
AsyncUnaryGreeterClient::AsyncUnaryGreeterClient(std::shared_ptr<Channel> channel, CompletionQueue *cq, CallDataUnary* pcd) {
stub_ = Greeter::NewStub(channel);
m_parent_call_data = pcd;
cq_ = cq;
//std::cout << "pos_2_1" << std::endl;
}
AsyncUnaryGreeterClient::~AsyncUnaryGreeterClient() {
//std::cout << "pos_2_2" << std::endl;
}
void AsyncUnaryGreeterClient::AsyncSayHello(const std::string& user) {
request_.set_name(user);
status_ = ClientStatus::PROCESS;
stream_ = stub_->PrepareAsyncSayHello(&context_, request_, cq_);
stream_->StartCall();
stream_->Finish(&response_, &finish_status_, this);
}
void AsyncUnaryGreeterClient::Proceed(bool ok) {
if (!ok) {
std::cout << "Unary client get non-ok result from peer:" << context_.peer() << std::endl;
return;
}
switch (status_) {
case ClientStatus::PROCESS:
//std::cout << "Read a new message:" << response_.message() << " from peer:" << context_.peer() << std::endl;
m_parent_call_data->NotifyOneDone();
delete this;
break;
default:
std::cerr << "ClientUnexpected status:" << int(status_) << std::endl;
assert(false);
}
}
class ServerImpl final {
public:
~ServerImpl() {
server_->Shutdown();
for (const auto& _cq : m_cq)
_cq->Shutdown();
}
void Run() {
std::string server_address("0.0.0.0:50051");
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service_);
for (int i = 0; i < g_cq_num; ++i)
m_cq.emplace_back(builder.AddCompletionQueue());
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
//Fidxed #threads polling on the g_client_cq
auto _lambda = [&](CompletionQueue* cq) {
void* tag;
bool ok;
while (true) {
GPR_ASSERT(cq->Next(&tag, &ok));
CallDataBase* _p_ins = (CallDataBase*)tag;
_p_ins->Proceed(ok);
}
};
std::vector<std::thread*> _vec_client_threads;
for (int i = 0; i < 2; ++i)
_vec_client_threads.emplace_back(new std::thread(_lambda,g_client_cq));
std::vector<std::thread*> _vec_threads;
for (int i = 0; i < g_thread_num; ++i) {
int _cq_idx = i % g_cq_num;
for (int j = 0; j < g_ins_pool; ++j)
new CallDataUnary(&service_, m_cq[_cq_idx].get());
_vec_threads.emplace_back(new std::thread(&ServerImpl::HandleRpcs, this, _cq_idx));
}
std::cout << g_thread_num << " working aysnc threads spawned" << std::endl;
for (const auto& _t : _vec_threads)
_t->join();
}
private:
void HandleRpcs(int cq_idx) {
void* tag;
bool ok;
while (true) {
GPR_ASSERT(m_cq[cq_idx]->Next(&tag, &ok));
CallDataBase* _p_ins = (CallDataBase*)tag;
_p_ins->Proceed(ok);
}
}
std::vector<std::unique_ptr<ServerCompletionQueue>> m_cq;
Greeter::AsyncService service_;
std::unique_ptr<Server> server_;
};
const char* ParseCmdPara( char* argv,const char* para) {
auto p_target = std::strstr(argv,para);
if (p_target == nullptr) {
printf("para error argv[%s] should be %s \n",argv,para);
return nullptr;
}
p_target += std::strlen(para);
return p_target;
}
int main(int argc, char** argv) {
if (argc != 5) {
std::cout << "Usage:./program --thread=xx --cq=xx --pool=xx --channel_pool=xx";
return 0;
}
g_client_cq = new CompletionQueue();
g_thread_num = std::atoi(ParseCmdPara(argv[1],"--thread="));
g_cq_num = std::atoi(ParseCmdPara(argv[2],"--cq="));
g_ins_pool = std::atoi(ParseCmdPara(argv[3],"--pool="));
g_channel_pool = std::atoi(ParseCmdPara(argv[4],"--channel_pool="));
g_channel = new ChannelPool();
ServerImpl server;
server.Run();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment