Skip to content

Instantly share code, notes, and snippets.

@NatalieWolfe
Created February 9, 2022 06:11
Show Gist options
  • Save NatalieWolfe/724fa4d44ec280fbe563bd64dbdd670c to your computer and use it in GitHub Desktop.
Save NatalieWolfe/724fa4d44ec280fbe563bd64dbdd670c to your computer and use it in GitHub Desktop.
Example for combining Epoll and gRPC event queues.
syntax = "proto3";
package lw;
service GreeterService {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <unistd.h>
#include <chrono>
#include <iostream>
#include <stdexcept>
#include <thread>
#include <vector>
#include "experimental/greeter_service.grpc.pb.h"
#include "grpcpp/alarm.h"
#include "grpcpp/grpcpp.h"
void epoll_timeout(int epoll, void* data) {
int timer = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
itimerspec spec{
.it_interval = {0},
.it_value = {
.tv_sec = 1,
.tv_nsec = 0
}
};
timerfd_settime(timer, /*flags=*/0, &spec, nullptr);
epoll_event ev = {
.events = EPOLLIN | EPOLLONESHOT,
.data = {.ptr = data}
};
epoll_ctl(epoll, EPOLL_CTL_ADD, timer, &ev);
}
struct RequestProcessor {
explicit RequestProcessor(int epoll):
epoll{epoll},
responder{&context}
{}
void grpc_queue_tick() {
if (state == 1) {
std::cout << "Server received request from " << req.name() << std::endl;
state = 2;
epoll_timeout(epoll, (void*)this);
} else if (state == 2) {
std::cout << "Epoll completed, responding to message." << std::endl;
state = 3;
res.set_message("Hello, " + req.name());
responder.Finish(res, /*status=*/{}, (void*)this);
} else if (state == 3) {
std::cout << "State completed." << std::endl;
} else {
throw std::runtime_error("Unknown state: " + std::to_string(state));
}
}
int epoll;
grpc::ServerContext context;
lw::HelloRequest req;
lw::HelloResponse res;
grpc::ServerAsyncResponseWriter<lw::HelloResponse> responder;
int state = 1;
};
bool grpc_tick(grpc::ServerCompletionQueue& queue) {
void* tag = nullptr;
bool ok = false;
auto next_status = queue.AsyncNext(&tag, &ok, std::chrono::system_clock::now());
if (next_status == grpc::CompletionQueue::GOT_EVENT) {
if (ok && tag) {
static_cast<RequestProcessor*>(tag)->grpc_queue_tick();
} else {
std::cerr << "Not OK or bad tag: " << ok << "; " << tag << std::endl;
return false;
}
}
return next_status != grpc::CompletionQueue::SHUTDOWN;
}
bool tick_loops(int epoll, grpc::ServerCompletionQueue& queue) {
// Pump epoll events over to gRPC's completion queue.
epoll_event event{0};
while (epoll_wait(epoll, &event, /*maxevents=*/1, /*timeout=*/0)) {
grpc::Alarm alarm;
alarm.Set(&queue, std::chrono::system_clock::now(), event.data.ptr);
if (!grpc_tick(queue)) return false;
}
// Make sure gRPC gets at least 1 tick.
return grpc_tick(queue);
}
void server() {
lw::GreeterService::AsyncService service;
grpc::ServerBuilder builder;
builder.AddListeningPort("0.0.0.0:50051", grpc::InsecureServerCredentials());
builder.RegisterService(&service);
auto q = builder.AddCompletionQueue();
auto server = builder.BuildAndStart();
int epoll = epoll_create1(/*flags=*/0);
RequestProcessor processor{epoll};
service.RequestSayHello(
&processor.context,
&processor.req,
&processor.responder,
q.get(),
q.get(),
&processor
);
std::cout << "Server queue running." << std::endl;
while (tick_loops(epoll, *q)) {
// Example shutdown case.
if (processor.state == 3) break;
}
server->Shutdown();
q->Shutdown();
int i = 0;
void* tag = nullptr;
bool ok = false;
while (q->Next(&tag, &ok)) ++i;
std::cout << "Cleared " << i << " items from queue post-shutdown." << std::endl;
::close(epoll);
}
void client() {
auto channel = grpc::CreateChannel(
"localhost:50051",
grpc::InsecureChannelCredentials()
);
auto stub = lw::GreeterService::NewStub(channel);
grpc::ClientContext context;
lw::HelloRequest req;
lw::HelloResponse res;
req.set_name("Alice");
std::cout << "Client making request." << std::endl;
grpc::Status status = stub->SayHello(&context, req, &res);
std::cout << "Client received: " << res.message() << std::endl;
}
int main() {
std::thread s{&server};
std::thread c{&client};
c.join();
s.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment