Skip to content

Instantly share code, notes, and snippets.

@lieroz
Created December 23, 2020 15:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lieroz/6ab0b844eb659cd8d202783f467c4e3d to your computer and use it in GitHub Desktop.
Save lieroz/6ab0b844eb659cd8d202783f467c4e3d to your computer and use it in GitHub Desktop.
gRPC C++ async api wrapper
#pragma once
#include <thread>
#include <google/protobuf/message.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/alarm.h>
#include <logger.h>
namespace grpcasyncservice
{
using OnCreate = std::function<void(grpc::ServerContext *,
google::protobuf::Message *,
grpc::internal::ServerAsyncStreamingInterface *,
std::shared_ptr<grpc::CompletionQueue>,
std::shared_ptr<grpc::ServerCompletionQueue>,
void *)>;
using OnWork = std::function<bool(grpc::ServerContext &,
google::protobuf::Message *,
grpc::internal::ServerAsyncStreamingInterface *,
void *)>;
using FactoryFunction = std::function<std::shared_ptr<google::protobuf::Message>()>;
using Cache = std::list<std::shared_ptr<google::protobuf::Message>>;
struct Method
{
Method(size_t cacheSize,
FactoryFunction requestFunc,
OnCreate _onCreate,
OnWork _onWork)
: onCreate(std::move(_onCreate)), onWork(std::move(_onWork))
{
for (size_t i = 0; i < cacheSize; ++i)
{
cache.push_back(requestFunc());
}
}
OnCreate onCreate;
OnWork onWork;
Cache cache;
std::shared_mutex lock;
};
class InvokeTag
{
public:
InvokeTag(std::shared_ptr<grpc::ServerCompletionQueue> _cq, Method &_method)
: cq(_cq), method(_method), state(State::CREATE), responder(&context)
{
}
void invoke()
{
switch (state)
{
case State::CREATE:
{
if (method.cache.empty())
{
grpc::Alarm alarm;
alarm.Set(cq.get(),
std::chrono::system_clock::now() + std::chrono::milliseconds(100), this);
}
else
{
state = State::WORK;
{
std::unique_lock lock(method.lock);
request = method.cache.front();
method.cache.pop_front();
}
method.onCreate(&context, request.get(), &responder, cq, cq, this);
}
break;
}
case State::WORK:
{
std::call_once(flag, [_cq = cq, _method = std::ref(method)]() {
(new InvokeTag(_cq, _method))->invoke();
});
if (method.onWork(context, request.get(), &responder, this))
{
state = State::FINISH;
}
break;
}
case State::FINISH:
{
request->Clear();
{
std::unique_lock lock(method.lock);
method.cache.push_back(request);
}
delete this;
break;
}
}
}
private:
enum class State
{
CREATE,
WORK,
FINISH,
};
private:
grpc::ServerContext context;
std::once_flag flag;
std::shared_ptr<grpc::ServerCompletionQueue> cq;
Method &method;
State state;
grpc::ServerAsyncWriter<google::protobuf::Message> responder;
std::shared_ptr<google::protobuf::Message> request;
};
class ServiceImpl
{
public:
ServiceImpl(std::shared_ptr<grpc::ServerCompletionQueue> _cq,
std::vector<std::shared_ptr<Method>> &methods)
: cq(_cq)
{
for (auto &method: methods)
{
(new InvokeTag(cq, *method))->invoke();
}
}
void start()
{
void *tag = nullptr;
bool ok = false;
while (cq->Next(&tag, &ok))
{
if (ok)
{
static_cast<InvokeTag *>(tag)->invoke();
}
}
}
private:
std::shared_ptr<grpc::ServerCompletionQueue> cq;
};
class AsyncServiceImpl final
{
public:
AsyncServiceImpl(size_t _threadsCount, grpc::ServerBuilder &_builder)
: threadsCount(_threadsCount), builder(_builder)
{
}
template<typename Request>
AsyncServiceImpl &registerMethod(size_t cacheSize, OnCreate onCreate, OnWork onWork)
{
methods.push_back(std::make_shared<Method>(
cacheSize, []() { return std::make_shared<Request>(); }, std::move(onCreate),
std::move(onWork)));
return *this;
}
AsyncServiceImpl &start()
{
for (size_t i = 0; i < threadsCount; ++i)
{
queues.push_back(builder.AddCompletionQueue());
}
server = builder.BuildAndStart();
for (auto cq: queues)
{
threads.emplace_back([this, cq]() { ServiceImpl(cq, methods).start(); });
}
return *this;
}
void stop()
{
server->Shutdown();
for (auto &cq: queues)
{
cq->Shutdown();
}
}
void wait()
{
for (auto &thr: threads)
{
thr.join();
}
}
private:
size_t threadsCount;
grpc::ServerBuilder &builder;
std::unique_ptr<grpc::Server> server;
std::vector<std::thread> threads;
std::vector<std::shared_ptr<Method>> methods;
std::vector<std::shared_ptr<grpc::ServerCompletionQueue>> queues;
};
} // namespace grpcasyncservice
@aliakseis
Copy link

Тоже игрался с grpc: https://github.com/aliakseis/grpc-demo

Не очень понял, а как вы в результате пофиксили проблему с аллокатором?

@lieroz
Copy link
Author

lieroz commented Dec 24, 2020

Мы по итогу заменили аллокатор на другой, а саму асинхронную версию использовали потому что думали что в ней не будет проблемы с постоянным созданием новых потоков. Только после того как залезли в код gRPC, поняли что простое использование асинхронного АПИ не решит нашу проблему. Так как синхронный и асинхронный сервера используют один и тот же код для управления потоками из класса ThreadManager.

За примеры спасибо!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment