Skip to content

Instantly share code, notes, and snippets.

@ppLorins
Created July 18, 2019 07:22
Show Gist options
  • Save ppLorins/6e4cc625c2c5b8fd16ced3172b1ada09 to your computer and use it in GitHub Desktop.
Save ppLorins/6e4cc625c2c5b8fd16ced3172b1ada09 to your computer and use it in GitHub Desktop.
an example for the bidirectional streaming async grpc c++ server.
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <grpc++/grpc++.h>
#include "helloworld.grpc.pb.h"
using grpc::Server;
using grpc::ServerAsyncReaderWriter;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerCompletionQueue;
using grpc::Status;
using helloworld::HelloRequest;
using helloworld::HelloReply;
using helloworld::Greeter;
enum class Type { READ = 1, WRITE = 2, CONNECT = 3, DONE = 4, FINISH = 5 };
// NOTE: This is a complex example for an asynchronous, bidirectional streaming
// server. For a simpler example, start with the
// greeter_server/greeter_async_server first.
// Most of the logic is similar to AsyncBidiGreeterClient, so follow that class
// for detailed comments. Two main differences between the server and the client
// are: (a) Server cannot initiate a connection, so it first waits for a
// 'connection'. (b) Server can handle multiple streams at the same time, so
// the completion queue/server have a longer lifetime than the client(s).
class AsyncBidiGreeterServer {
public:
AsyncBidiGreeterServer() {
// In general avoid setting up the server in the main thread (specifically,
// in a constructor-like function such as this). We ignore this in the
// context of an example.
std::string server_address("0.0.0.0:50051");
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
// This initiates a single stream for a single client. To allow multiple
// clients in different threads to connect, simply 'request' from the
// different threads. Each stream is independent but can use the same
// completion queue/context objects.
stream_.reset(
new ServerAsyncReaderWriter<HelloReply, HelloRequest>(&context_));
service_.RequestSayHelloEx(&context_, stream_.get(), cq_.get(), cq_.get(),
reinterpret_cast<void*>(Type::CONNECT));
// This is important as the server should know when the client is done.
context_.AsyncNotifyWhenDone(reinterpret_cast<void*>(Type::DONE));
grpc_thread_.reset(new std::thread(
(std::bind(&AsyncBidiGreeterServer::GrpcThread, this))));
std::cout << "Server listening on " << server_address << std::endl;
}
void SetResponse(const std::string& response) {
if (response == "quit" && IsRunning()) {
stream_->Finish(grpc::Status::CANCELLED,
reinterpret_cast<void*>(Type::FINISH));
}
response_str_ = response;
}
~AsyncBidiGreeterServer() {
std::cout << "Shutting down server...." << std::endl;
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
grpc_thread_->join();
}
bool IsRunning() const { return is_running_; }
private:
void AsyncWaitForHelloRequest() {
if (IsRunning()) {
// In the case of the server, we wait for a READ first and then write a
// response. A server cannot initiate a connection so the server has to
// wait for the client to send a message in order for it to respond back.
stream_->Read(&request_, reinterpret_cast<void*>(Type::READ));
}
}
void AsyncHelloSendResponse() {
std::cout << " ** Handling request: " << request_.name() << std::endl;
HelloReply response;
std::cout << " ** Sending response: " << response_str_ << std::endl;
response.set_message(response_str_);
stream_->Write(response, reinterpret_cast<void*>(Type::WRITE));
}
void GrpcThread() {
while (true) {
void* got_tag = nullptr;
bool ok = false;
if (!cq_->Next(&got_tag, &ok)) {
std::cerr << "Server stream closed. Quitting" << std::endl;
break;
}
//assert(ok);
if (ok) {
std::cout << std::endl
<< "**** Processing completion queue tag " << got_tag
<< std::endl;
switch (static_cast<Type>(reinterpret_cast<size_t>(got_tag))) {
case Type::READ:
std::cout << "Read a new message." << std::endl;
AsyncHelloSendResponse();
break;
case Type::WRITE:
std::cout << "Sending message (async)." << std::endl;
AsyncWaitForHelloRequest();
break;
case Type::CONNECT:
std::cout << "Client connected." << std::endl;
AsyncWaitForHelloRequest();
break;
case Type::DONE:
std::cout << "Server disconnecting." << std::endl;
is_running_ = false;
break;
case Type::FINISH:
std::cout << "Server quitting." << std::endl;
break;
default:
std::cerr << "Unexpected tag " << got_tag << std::endl;
assert(false);
}
}
}
}
private:
HelloRequest request_;
std::string response_str_ = "Default server response";
ServerContext context_;
Greeter::AsyncService service_;
std::unique_ptr<ServerCompletionQueue> cq_;
std::unique_ptr<Server> server_;
std::unique_ptr<ServerAsyncReaderWriter<HelloReply, HelloRequest>> stream_;
std::unique_ptr<std::thread> grpc_thread_;
bool is_running_ = true;
};
int main(int argc, char** argv) {
AsyncBidiGreeterServer server;
std::string response;
while (server.IsRunning()) {
std::cout << "Enter next set of responses (type quit to end): ";
std::cin >> response;
server.SetResponse(response);
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment