Created
February 9, 2022 06:11
-
-
Save NatalieWolfe/724fa4d44ec280fbe563bd64dbdd670c to your computer and use it in GitHub Desktop.
Example for combining Epoll and gRPC event queues.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
syntax = "proto3"; | |
package lw; | |
service GreeterService { | |
rpc SayHello (HelloRequest) returns (HelloResponse) {} | |
} | |
message HelloRequest { | |
string name = 1; | |
} | |
message HelloResponse { | |
string message = 1; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <sys/epoll.h> | |
#include <sys/timerfd.h> | |
#include <unistd.h> | |
#include <chrono> | |
#include <iostream> | |
#include <stdexcept> | |
#include <thread> | |
#include <vector> | |
#include "experimental/greeter_service.grpc.pb.h" | |
#include "grpcpp/alarm.h" | |
#include "grpcpp/grpcpp.h" | |
void epoll_timeout(int epoll, void* data) { | |
int timer = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); | |
itimerspec spec{ | |
.it_interval = {0}, | |
.it_value = { | |
.tv_sec = 1, | |
.tv_nsec = 0 | |
} | |
}; | |
timerfd_settime(timer, /*flags=*/0, &spec, nullptr); | |
epoll_event ev = { | |
.events = EPOLLIN | EPOLLONESHOT, | |
.data = {.ptr = data} | |
}; | |
epoll_ctl(epoll, EPOLL_CTL_ADD, timer, &ev); | |
} | |
struct RequestProcessor { | |
explicit RequestProcessor(int epoll): | |
epoll{epoll}, | |
responder{&context} | |
{} | |
void grpc_queue_tick() { | |
if (state == 1) { | |
std::cout << "Server received request from " << req.name() << std::endl; | |
state = 2; | |
epoll_timeout(epoll, (void*)this); | |
} else if (state == 2) { | |
std::cout << "Epoll completed, responding to message." << std::endl; | |
state = 3; | |
res.set_message("Hello, " + req.name()); | |
responder.Finish(res, /*status=*/{}, (void*)this); | |
} else if (state == 3) { | |
std::cout << "State completed." << std::endl; | |
} else { | |
throw std::runtime_error("Unknown state: " + std::to_string(state)); | |
} | |
} | |
int epoll; | |
grpc::ServerContext context; | |
lw::HelloRequest req; | |
lw::HelloResponse res; | |
grpc::ServerAsyncResponseWriter<lw::HelloResponse> responder; | |
int state = 1; | |
}; | |
bool grpc_tick(grpc::ServerCompletionQueue& queue) { | |
void* tag = nullptr; | |
bool ok = false; | |
auto next_status = queue.AsyncNext(&tag, &ok, std::chrono::system_clock::now()); | |
if (next_status == grpc::CompletionQueue::GOT_EVENT) { | |
if (ok && tag) { | |
static_cast<RequestProcessor*>(tag)->grpc_queue_tick(); | |
} else { | |
std::cerr << "Not OK or bad tag: " << ok << "; " << tag << std::endl; | |
return false; | |
} | |
} | |
return next_status != grpc::CompletionQueue::SHUTDOWN; | |
} | |
bool tick_loops(int epoll, grpc::ServerCompletionQueue& queue) { | |
// Pump epoll events over to gRPC's completion queue. | |
epoll_event event{0}; | |
while (epoll_wait(epoll, &event, /*maxevents=*/1, /*timeout=*/0)) { | |
grpc::Alarm alarm; | |
alarm.Set(&queue, std::chrono::system_clock::now(), event.data.ptr); | |
if (!grpc_tick(queue)) return false; | |
} | |
// Make sure gRPC gets at least 1 tick. | |
return grpc_tick(queue); | |
} | |
void server() { | |
lw::GreeterService::AsyncService service; | |
grpc::ServerBuilder builder; | |
builder.AddListeningPort("0.0.0.0:50051", grpc::InsecureServerCredentials()); | |
builder.RegisterService(&service); | |
auto q = builder.AddCompletionQueue(); | |
auto server = builder.BuildAndStart(); | |
int epoll = epoll_create1(/*flags=*/0); | |
RequestProcessor processor{epoll}; | |
service.RequestSayHello( | |
&processor.context, | |
&processor.req, | |
&processor.responder, | |
q.get(), | |
q.get(), | |
&processor | |
); | |
std::cout << "Server queue running." << std::endl; | |
while (tick_loops(epoll, *q)) { | |
// Example shutdown case. | |
if (processor.state == 3) break; | |
} | |
server->Shutdown(); | |
q->Shutdown(); | |
int i = 0; | |
void* tag = nullptr; | |
bool ok = false; | |
while (q->Next(&tag, &ok)) ++i; | |
std::cout << "Cleared " << i << " items from queue post-shutdown." << std::endl; | |
::close(epoll); | |
} | |
void client() { | |
auto channel = grpc::CreateChannel( | |
"localhost:50051", | |
grpc::InsecureChannelCredentials() | |
); | |
auto stub = lw::GreeterService::NewStub(channel); | |
grpc::ClientContext context; | |
lw::HelloRequest req; | |
lw::HelloResponse res; | |
req.set_name("Alice"); | |
std::cout << "Client making request." << std::endl; | |
grpc::Status status = stub->SayHello(&context, req, &res); | |
std::cout << "Client received: " << res.message() << std::endl; | |
} | |
int main() { | |
std::thread s{&server}; | |
std::thread c{&client}; | |
c.join(); | |
s.join(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment