Skip to content

Instantly share code, notes, and snippets.

@ppLorins
Created May 28, 2019 20:13
Show Gist options
  • Save ppLorins/d4484b61f12b2d87ac5c8d50d0808974 to your computer and use it in GitHub Desktop.
Save ppLorins/d4484b61f12b2d87ac5c8d50d0808974 to your computer and use it in GitHub Desktop.
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <grpc++/grpc++.h>
#include "helloworld.grpc.pb.h"
using grpc::Channel;
using grpc::ClientAsyncReaderWriter;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using helloworld::HelloRequest;
using helloworld::HelloReply;
using helloworld::Greeter;
// NOTE: This is a complex example for an asynchronous, bidirectional streaming
// client. For a simpler example, start with the
// greeter_client/greeter_async_client first.
class AsyncBidiGreeterClient {
enum class Type {
READ = 1,
WRITE = 2,
CONNECT = 3,
WRITES_DONE = 4,
FINISH = 5
};
public:
explicit AsyncBidiGreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {
grpc_thread_.reset(
new std::thread(std::bind(&AsyncBidiGreeterClient::GrpcThread, this)));
stream_ = stub_->AsyncSayHelloEx(&context_, &cq_,
reinterpret_cast<void*>(Type::CONNECT));
}
~AsyncBidiGreeterClient() {
std::cout << "Shutting down client...." << std::endl;
grpc::Status status;
cq_.Shutdown();
grpc_thread_->join();
}
// Similar to the async hello example in greeter_async_client but does not
// wait for the response. Instead queues up a tag in the completion queue
// that is notified when the server responds back (or when the stream is
// closed). Returns false when the stream is requested to be closed.
bool AsyncSayHello(const std::string& user) {
if (user == "quit") {
stream_->WritesDone(reinterpret_cast<void*>(Type::WRITES_DONE));
return true;
}
// Data we are sending to the server.
request_.set_name(user);
// This is important: You can have at most one write or at most one read
// at any given time. The throttling is performed by gRPC completion
// queue. If you queue more than one write/read, the stream will crash.
// Because this stream is bidirectional, you *can* have a single read
// and a single write request queued for the same stream. Writes and reads
// are independent of each other in terms of ordering/delivery.
//std::cout << " ** Sending request: " << user << std::endl;
stream_->Write(request_, reinterpret_cast<void*>(Type::WRITE));
return true;
}
private:
void AsyncHelloRequestNextMessage() {
// The tag is the link between our thread (main thread) and the completion
// queue thread. The tag allows the completion queue to fan off
// notification handlers for the specified read/write requests as they
// are being processed by gRPC.
stream_->Read(&response_, reinterpret_cast<void*>(Type::READ));
}
// Runs a gRPC completion-queue processing thread. Checks for 'Next' tag
// and processes them until there are no more (or when the completion queue
// is shutdown).
void GrpcThread() {
while (true) {
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or the cq_ is shutting
// down.
if (!cq_.Next(&got_tag, &ok)) {
std::cerr << "Client stream closed. Quitting" << std::endl;
break;
}
// It's important to process all tags even if the ok is false. One might
// want to deallocate memory that has be reinterpret_cast'ed to void*
// when the tag got initialized. For our example, we cast an int to a
// void*, so we don't have extra memory management to take care of.
if (ok) {
//std::cout << std::endl << "**** Processing completion queue tag " << got_tag << std::endl;
switch (static_cast<Type>(reinterpret_cast<long>(got_tag))) {
case Type::READ:
std::cout << "Read a new message:" << response_.message() << std::endl;
break;
case Type::WRITE:
std::cout << "Sent message :" << request_.name() << std::endl;
AsyncHelloRequestNextMessage();
break;
case Type::CONNECT:
std::cout << "Server connected." << std::endl;
break;
case Type::WRITES_DONE:
std::cout << "writesdone sent,sleeping 5s" << std::endl;
stream_->Finish(&finish_status_, reinterpret_cast<void*>(Type::FINISH));
break;
case Type::FINISH:
std::cout << "Client finish status:" << finish_status_.error_code() << ", msg:" << finish_status_.error_message() << std::endl;
//context_.TryCancel();
cq_.Shutdown();
break;
default:
std::cerr << "Unexpected tag " << got_tag << std::endl;
assert(false);
}
}
}
}
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context_;
// The producer-consumer queue we use to communicate asynchronously with the
// gRPC runtime.
CompletionQueue cq_;
// Out of the passed in Channel comes the stub, stored here, our view of the
// server's exposed services.
std::unique_ptr<Greeter::Stub> stub_;
// The bidirectional, asynchronous stream for sending/receiving messages.
std::unique_ptr<ClientAsyncReaderWriter<HelloRequest, HelloReply>> stream_;
// Allocated protobuf that holds the response. In real clients and servers,
// the memory management would a bit more complex as the thread that fills
// in the response should take care of concurrency as well as memory
// management.
HelloRequest request_;
HelloReply response_;
// Thread that notifies the gRPC completion queue tags.
std::unique_ptr<std::thread> grpc_thread_;
// Finish status when the client is done with the stream.
grpc::Status finish_status_ = grpc::Status::OK;
};
int main(int argc, char** argv) {
AsyncBidiGreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
std::string text;
while (true) {
//std::cout << "Enter text (type quit to end): ";
std::cin >> text;
// Async RPC call that sends a message and awaits a response.
if (!greeter.AsyncSayHello(text)) {
std::cout << "Quitting." << std::endl;
break;
}
}
return 0;
}
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <memory>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <random>
#include <atomic>
#include <grpc++/grpc++.h>
#include <grpc/support/log.h>
#include "helloworld.grpc.pb.h"
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerAsyncReaderWriter;
using grpc::ServerCompletionQueue;
using grpc::Status;
using grpc::StatusCode;
using helloworld::HelloRequest;
using helloworld::HelloReply;
using helloworld::Greeter;
int g_thread_num = 1;
int g_cq_num = 1;
int g_pool = 1;
int g_port = 50051;
std::atomic<void*>** g_instance_pool = nullptr;
class CallDataBase {
public:
CallDataBase(Greeter::AsyncService* service, ServerCompletionQueue* cq) : service_(service), cq_(cq){
}
virtual void Proceed(bool ok) = 0;
protected:
// The means of communication with the gRPC runtime for an asynchronous
// server.
Greeter::AsyncService* service_;
// The producer-consumer queue where for asynchronous server notifications.
ServerCompletionQueue* cq_;
// Context for the rpc, allowing to tweak aspects of it such as the use
// of compression, authentication, as well as to send metadata back to the
// client.
ServerContext ctx_;
// What we get from the client.
HelloRequest request_;
// What we send back to the client.
HelloReply reply_;
};
class CallDataUnary : CallDataBase {
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallDataUnary(Greeter::AsyncService* service, ServerCompletionQueue* cq) : CallDataBase(service,cq),responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallDataUnary
// instances can serve different requests concurrently), in this case
// the memory address of this CallDataUnary instance.
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, (void*)this);
status_ = PROCESS;
}
void Proceed(bool ok) {
if (status_ == PROCESS) {
// Spawn a new CallDataUnary instance to serve new clients while we process
// the one for this CallDataUnary. The instance will deallocate itself as
// part of its FINISH state.
new CallDataUnary(service_, cq_);
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name());
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
status_ = FINISH;
responder_.Finish(reply_, Status::OK, (void*)this);
} else {
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (CallDataUnary).
delete this;
}
}
private:
// The means to get back to the client.
ServerAsyncResponseWriter<HelloReply> responder_;
// Let's implement a tiny state machine with the following states.
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_; // The current serving state.
};
class CallDataBidi : CallDataBase {
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallDataBidi(Greeter::AsyncService* service, ServerCompletionQueue* cq) : CallDataBase(service,cq),rw_(&ctx_){
// Invoke the serving logic right away.
status_ = BidiStatus::CONNECT;
ctx_.AsyncNotifyWhenDone((void*)this);
service_->RequestSayHelloEx(&ctx_, &rw_, cq_, cq_, (void*)this);
}
void Proceed(bool ok) {
std::unique_lock<std::mutex> _wlock(this->m_mutex);
switch (status_) {
case BidiStatus::READ:
//Meaning client said it wants to end the stream either by a 'writedone' or 'finish' call.
if (!ok) {
std::cout << "thread:" << std::this_thread::get_id() << " tag:" << this << " CQ returned false." << std::endl;
Status _st(StatusCode::OUT_OF_RANGE,"test error msg");
rw_.Finish(_st,(void*)this);
status_ = BidiStatus::DONE;
std::cout << "thread:" << std::this_thread::get_id() << " tag:" << this << " after call Finish(), cancelled:" << this->ctx_.IsCancelled() << std::endl;
break;
}
std::cout << "thread:" << std::this_thread::get_id() << " tag:" << this << " Read a new message:" << request_.name() << std::endl;
reply_.set_message("arthur");
rw_.Write(reply_, (void*)this);
status_ = BidiStatus::WRITE;
break;
case BidiStatus::WRITE:
std::cout << "thread:" << std::this_thread::get_id() << " tag:" << this << " Written a message:" << reply_.message() << std::endl;
rw_.Read(&request_, (void*)this);
status_ = BidiStatus::READ;
break;
case BidiStatus::CONNECT:
std::cout << "thread:" << std::this_thread::get_id() << " tag:" << this << " connected:" << std::endl;
new CallDataBidi(service_, cq_);
rw_.Read(&request_, (void*)this);
status_ = BidiStatus::READ;
break;
case BidiStatus::DONE:
std::cout << "thread:" << std::this_thread::get_id() << " tag:" << this
<< " Server done, cancelled:" << this->ctx_.IsCancelled() << std::endl;
status_ = BidiStatus::FINISH;
break;
case BidiStatus::FINISH:
std::cout << "thread:" << std::this_thread::get_id() << "tag:" << this << " Server finish, cancelled:" << this->ctx_.IsCancelled() << std::endl;
_wlock.unlock();
delete this;
break;
default:
std::cerr << "Unexpected tag " << int(status_) << std::endl;
assert(false);
}
}
private:
// The means to get back to the client.
ServerAsyncReaderWriter<HelloReply,HelloRequest> rw_;
// Let's implement a tiny state machine with the following states.
enum class BidiStatus { READ = 1, WRITE = 2, CONNECT = 3, DONE = 4, FINISH = 5 };
BidiStatus status_;
std::mutex m_mutex;
};
class ServerImpl final {
public:
~ServerImpl() {
server_->Shutdown();
// Always shutdown the completion queue after the server.
for (const auto& _cq : m_cq)
_cq->Shutdown();
}
// There is no shutdown handling in this code.
void Run() {
std::string server_address("0.0.0.0:" + std::to_string(g_port));
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
for (int i = 0; i < g_cq_num; ++i) {
//cq_ = builder.AddCompletionQueue();
m_cq.emplace_back(builder.AddCompletionQueue());
}
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
std::vector<std::thread*> _vec_threads;
for (int i = 0; i < g_thread_num; ++i) {
int _cq_idx = i % g_cq_num;
for (int j = 0; j < g_pool; ++j) {
new CallDataUnary(&service_, m_cq[_cq_idx].get());
new CallDataBidi(&service_, m_cq[_cq_idx].get());
}
_vec_threads.emplace_back(new std::thread(&ServerImpl::HandleRpcs, this, _cq_idx));
}
std::cout << g_thread_num << " working aysnc threads spawned" << std::endl;
for (const auto& _t : _vec_threads)
_t->join();
}
private:
// Class encompasing the state and logic needed to serve a request.
// This can be run in multiple threads if needed.
void HandleRpcs(int cq_idx) {
// Spawn a new CallDataUnary instance to serve new clients.
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallDataUnary instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
//GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(m_cq[cq_idx]->Next(&tag, &ok));
CallDataBase* _p_ins = (CallDataBase*)tag;
_p_ins->Proceed(ok);
}
}
std::vector<std::unique_ptr<ServerCompletionQueue>> m_cq;
Greeter::AsyncService service_;
std::unique_ptr<Server> server_;
};
const char* ParseCmdPara( char* argv,const char* para) {
auto p_target = std::strstr(argv,para);
if (p_target == nullptr) {
printf("para error argv[%s] should be %s \n",argv,para);
return nullptr;
}
p_target += std::strlen(para);
return p_target;
}
int main(int argc, char** argv) {
if (argc != 5) {
std::cout << "Usage:./program --thread=xx --cq=xx --pool=xx --port=xx";
return 0;
}
g_thread_num = std::atoi(ParseCmdPara(argv[1],"--thread="));
g_cq_num = std::atoi(ParseCmdPara(argv[2],"--cq="));
g_pool = std::atoi(ParseCmdPara(argv[3],"--pool="));
g_port = std::atoi(ParseCmdPara(argv[4],"--port="));
ServerImpl server;
server.Run();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment