Skip to content

Instantly share code, notes, and snippets.

@ppLorins
Created July 19, 2019 09:37
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ppLorins/f30e6a2e14c3738288200b43a803b122 to your computer and use it in GitHub Desktop.
Save ppLorins/f30e6a2e14c3738288200b43a803b122 to your computer and use it in GitHub Desktop.
an example for grpc async c++ client
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <iostream>
#include <memory>
#include <string>
#include <grpc++/grpc++.h>
#include <grpc/support/log.h>
#include <thread>
#include "helloworld.grpc.pb.h"
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using helloworld::HelloRequest;
using helloworld::HelloReply;
using helloworld::Greeter;
class GreeterClient {
public:
explicit GreeterClient(const std::string &addr,CompletionQueue* in_cq,uint32_t count) : m_total(count) {
//auto shp_channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
auto _channel_args = ::grpc::ChannelArguments();
auto _tp = std::chrono::system_clock::now();
std::chrono::seconds _sec = std::chrono::duration_cast<std::chrono::seconds>(_tp.time_since_epoch());
std::string _key = "key_" + std::to_string(_sec.count());
std::string _val = "val_" + std::to_string(_sec.count());
_channel_args.SetString(_key,_val);
auto shp_channel = ::grpc::CreateCustomChannel(addr, grpc::InsecureChannelCredentials(),
_channel_args);
stub_ = Greeter::NewStub(shp_channel);
this->cq_ = in_cq;
//this->cq_ = new CompletionQueue();
}
// Assembles the client's payload and sends it to the server.
void SayHello(const std::string& user,int idx) {
// Data we are sending to the server.
HelloRequest request;
request.set_name(std::to_string(idx));
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
std::chrono::time_point<std::chrono::system_clock> _deadline = std::chrono::system_clock::now()
+ std::chrono::milliseconds(3100);
call->context.set_deadline(_deadline);
// stub_->PrepareAsyncSayHello() creates an RPC object, returning
// an instance to store in "call" but does not actually start the RPC
// Because we are using the asynchronous API, we need to hold on to
// the "call" instance in order to get updates on the ongoing RPC.
call->response_reader = stub_->PrepareAsyncSayHello(&call->context, request, cq_);
// StartCall initiates the RPC call
call->response_reader->StartCall();
// Request that, upon completion of the RPC, "reply" be updated with the
// server's response; "status" with the indication of whether the operation
// was successful. Tag the request with the memory address of the call object.
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
}
// Loop while listening for completed responses.
// Prints out the response from the server.
void AsyncCompleteRpc() {
//std::cout << "thread " << std::this_thread::get_id() << " in.." << std::endl;;
void* got_tag;
bool ok = false;
uint32_t _counter = 0;
auto _start = std::chrono::steady_clock::now();
std::cout << "thread " << std::this_thread::get_id() << " start timer" << std::endl;;
// Block until the next result is available in the completion queue "cq".
while (cq_->Next(&got_tag, &ok)) {
//std::cout << "before counter:" << _counter << std::endl;
//std::this_thread::sleep_for(std::chrono::seconds(2));
// The tag in this example is the memory location of the call object
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
// Verify that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
if (!call->status.ok()) {
std::cout << call->status.error_code() << ",msg:" << call->status.error_message();
GPR_ASSERT(false);
}
/*
if (call->status.ok())
std::cout << "Greeter received: " << call->reply.message() << std::endl;
else
std::cout << "RPC failed" << std::endl;
*/
// Once we're complete, deallocate the call object.
delete call;
if (++_counter >= m_total)
break;
}
auto _end = std::chrono::steady_clock::now();
auto _ms = std::chrono::duration_cast<std::chrono::milliseconds>(_end - _start);
std::cout << "thread " << std::this_thread::get_id() << " inner time cost:" << _ms.count() << std::endl;
uint32_t _throughput = m_total / float(_ms.count()) * 1000;
std::cout << "thread " << std::this_thread::get_id() << " inner throughput : " << _throughput << std::endl;
}
private:
// struct for keeping state and data information
struct AsyncClientCall {
// Container for the data we expect from the server.
HelloReply reply;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
// Storage for the status of the RPC upon completion.
Status status;
std::unique_ptr<ClientAsyncResponseReader<HelloReply>> response_reader;
};
// Out of the passed in Channel comes the stub, stored here, our view of the
// server's exposed services.
std::unique_ptr<Greeter::Stub> stub_;
// The producer-consumer queue we use to communicate asynchronously with the
// gRPC runtime.
CompletionQueue* cq_;
uint32_t m_total;
};
int main(int argc, char** argv) {
uint32_t count = 50000;
if (argc != 4) {
std::cout << "Usage:./program --count=xx --thread=xx --addr=xx";
return 0;
}
const char * target_str = "--count=";
auto p_target = std::strstr(argv[1],target_str);
if (p_target == nullptr) {
printf("para error argv[1] should be --count=xx \n");
return 0;
}
p_target += std::strlen(target_str);
count = std::atoi(p_target);
uint32_t thread_num = 1;
target_str = "--thread=";
p_target = std::strstr(argv[2],target_str);
if (p_target == nullptr) {
printf("para error argv[2] should be --thread=xx \n");
return 0;
}
p_target += std::strlen(target_str);
thread_num = std::atoi(p_target);
std::string _addr = "localhost:50051";
target_str = "--addr=";
p_target = std::strstr(argv[3],target_str);
if (p_target == nullptr) {
printf("para error argv[1] should be --addr=xx \n");
return 0;
}
p_target += std::strlen(target_str);
_addr = p_target;
std::cout << "req for each thread:" << count << std::endl;
// Instantiate the client. It requires a channel, out of which the actual RPCs
// are created. This channel models a connection to an endpoint (in this case,
// localhost at port 50051). We indicate that the channel isn't authenticated
// (use of InsecureChannelCredentials()).
CompletionQueue one_cq;
std::vector<GreeterClient*> _vec;
std::vector<std::thread*> _vec_t;
for (int i = 0; i < thread_num; ++i) {
auto * _p_client = new GreeterClient(_addr,&one_cq,count);
_vec.push_back(_p_client);
std::string user("world " + std::to_string(i));
for (int j = 0; j < count; j++)
_p_client->SayHello(user, j); // The actual RPC call!
}
auto _start = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < thread_num; i++)
_vec_t.push_back(new std::thread(&GreeterClient::AsyncCompleteRpc, _vec[i]));
//std::this_thread::sleep_for(std::chrono::milliseconds(30));
for (uint32_t i = 0; i < thread_num; i++)
_vec_t[i]->join();
int _total = thread_num * count;
std::cout << "m_total:" << _total << std::endl;
auto _end = std::chrono::steady_clock::now();
auto _ms = std::chrono::duration_cast<std::chrono::milliseconds>(_end - _start);
std::cout << "time cost:" << _ms.count() << std::endl;
uint32_t _throughput = _total / float(_ms.count()) * 1000;
std::cout << "throughput : " << _throughput << std::endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment