Skip to content

Instantly share code, notes, and snippets.

@matthauck
Last active April 22, 2022 23:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matthauck/206f17ffe811706d8f8ce09598f2c60d to your computer and use it in GitHub Desktop.
Save matthauck/206f17ffe811706d8f8ce09598f2c60d to your computer and use it in GitHub Desktop.
GRPC Client API
package testing_api;
service GrpcTest
{
rpc Unary( Request ) returns ( Response );
rpc ClientStream( stream Request ) returns ( Response );
rpc ServerStream( Request ) returns ( stream Response );
rpc AllStream( stream Request ) returns ( stream Response );
}
message Request
{
bytes data = 1;
}
message Response
{
bytes data = 1;
}
#include "GrpcTestClient_api.gen.h"
#include "NetworkAsync/HTTPAsync/AsyncHTTPHeaders.h"
#include "Core/Ownership/MakeUnique.h"
namespace
{
const char* const kService = "GrpcTest";
} // namespace
GrpcTestGRPCClient::GrpcTestGRPCClient( GrpcClientConnection& connection )
: m_connection( connection )
{
}
GrpcTestGRPCClient::UnaryContext::UnaryContext( std::shared_ptr< GrpcClientConnection::Context > context, const grpc::Settings& settings )
: m_ctx( std::move( context ) )
, m_request( m_ctx->requestSink, settings )
, m_response( m_ctx->responseSource, settings )
{
}
absl::StatusOr< testing_api::Response > GrpcTestGRPCClient::UnaryContext::ReadResponse()
{
testing_api::Response out;
if ( !m_response.ReadArgument( out ) )
{
return m_response.Status();
}
return absl::StatusOr< testing_api::Response >( std::move( out ) );
}
std::unique_ptr< GrpcTestGRPCClient::UnaryContext > GrpcTestGRPCClient::Unary( AsyncHTTPRequestHeaders headers,
const testing_api::Request& arg,
TimeDifference timeout,
EventCallback& callback )
{
auto ctx = m_connection.NewRequestContext( std::move( headers ), kService, "Unary", timeout, callback );
auto req = std::make_unique< UnaryContext >( std::move( ctx ), m_connection.GetSettings() );
if ( !req->m_request.Write( arg ) )
{
throw DetailedException( "Failed to write request: " + req->m_request.Status().ToString() );
}
req->m_ctx->requestSink.End();
return req;
}
GrpcTestGRPCClient::ClientStreamContext::ClientStreamContext( std::shared_ptr< GrpcClientConnection::Context > context, const grpc::Settings& settings )
: m_ctx( std::move( context ) )
, m_request( m_ctx->requestSink, settings )
, m_response( m_ctx->responseSource, settings )
{
}
absl::StatusOr< testing_api::Response > GrpcTestGRPCClient::ClientStreamContext::ReadResponse()
{
testing_api::Response out;
if ( !m_response.ReadArgument( out ) )
{
return m_response.Status();
}
return absl::StatusOr< testing_api::Response >( std::move( out ) );
}
std::unique_ptr< GrpcTestGRPCClient::ClientStreamContext > GrpcTestGRPCClient::ClientStream( AsyncHTTPRequestHeaders headers,
TimeDifference timeout,
EventCallback& callback )
{
auto ctx = m_connection.NewRequestContext( std::move( headers ), kService, "ClientStream", timeout, callback );
auto req = std::make_unique< ClientStreamContext >( std::move( ctx ), m_connection.GetSettings() );
return req;
}
GrpcTestGRPCClient::ServerStreamContext::ServerStreamContext( std::shared_ptr< GrpcClientConnection::Context > context, const grpc::Settings& settings )
: m_ctx( std::move( context ) )
, m_request( m_ctx->requestSink, settings )
, m_response( m_ctx->responseSource, settings )
{
}
std::unique_ptr< GrpcTestGRPCClient::ServerStreamContext > GrpcTestGRPCClient::ServerStream( AsyncHTTPRequestHeaders headers,
const testing_api::Request& arg,
TimeDifference timeout,
EventCallback& callback )
{
auto ctx = m_connection.NewRequestContext( std::move( headers ), kService, "ServerStream", timeout, callback );
auto req = std::make_unique< ServerStreamContext >( std::move( ctx ), m_connection.GetSettings() );
if ( !req->m_request.Write( arg ) )
{
throw DetailedException( "Failed to write request: " + req->m_request.Status().ToString() );
}
req->m_ctx->requestSink.End();
return req;
}
GrpcTestGRPCClient::AllStreamContext::AllStreamContext( std::shared_ptr< GrpcClientConnection::Context > context, const grpc::Settings& settings )
: m_ctx( std::move( context ) )
, m_request( m_ctx->requestSink, settings )
, m_response( m_ctx->responseSource, settings )
{
}
std::unique_ptr< GrpcTestGRPCClient::AllStreamContext > GrpcTestGRPCClient::AllStream( AsyncHTTPRequestHeaders headers,
TimeDifference timeout,
EventCallback& callback )
{
auto ctx = m_connection.NewRequestContext( std::move( headers ), kService, "AllStream", timeout, callback );
auto req = std::make_unique< AllStreamContext >( std::move( ctx ), m_connection.GetSettings() );
return req;
}
#pragma once
#include "absl/status/statusor.h"
#include "GRPC/GRPCReader.h"
#include "GRPC/GRPCWriter.h"
#include "NetworkAsync/GrpcClientConnection.h"
#include "GrpcTest_api.pb.h"
class EventCallback;
class TimeDifference;
class AsyncHTTPRequestHeaders;
class GrpcTestGRPCClient
{
public:
explicit GrpcTestGRPCClient( GrpcClientConnection& );
// testing_api.GrpcTest.Unary
class UnaryContext
{
public:
UnaryContext( std::shared_ptr< GrpcClientConnection::Context >, const grpc::Settings& );
absl::StatusOr< testing_api::Response > ReadResponse();
Potential< absl::Status > GetStatus() const { return *m_ctx->status.Lock(); }
private:
friend class GrpcTestGRPCClient;
std::shared_ptr< GrpcClientConnection::Context > m_ctx;
grpc::Writer< testing_api::Request > m_request;
grpc::Reader< testing_api::Response > m_response;
};
std::unique_ptr< UnaryContext > Unary( AsyncHTTPRequestHeaders,
const testing_api::Request&,
TimeDifference timeout,
EventCallback& );
// testing_api.GrpcTest.ClientStream
class ClientStreamContext
{
public:
ClientStreamContext( std::shared_ptr< GrpcClientConnection::Context >, const grpc::Settings& );
grpc::Writer< testing_api::Request >& GetRequest() { return m_request; }
void EndRequest() { m_ctx->requestSink.End(); }
absl::StatusOr< testing_api::Response > ReadResponse();
Potential< absl::Status > GetStatus() const { return *m_ctx->status.Lock(); }
private:
friend class GrpcTestGRPCClient;
std::shared_ptr< GrpcClientConnection::Context > m_ctx;
grpc::Writer< testing_api::Request > m_request;
grpc::Reader< testing_api::Response > m_response;
};
std::unique_ptr< ClientStreamContext > ClientStream( AsyncHTTPRequestHeaders,
TimeDifference timeout,
EventCallback& );
// testing_api.GrpcTest.ServerStream
class ServerStreamContext
{
public:
ServerStreamContext( std::shared_ptr< GrpcClientConnection::Context >, const grpc::Settings& );
grpc::Reader< testing_api::Response >& GetResponse() { return m_response; }
Potential< absl::Status > GetStatus() const { return *m_ctx->status.Lock(); }
private:
friend class GrpcTestGRPCClient;
std::shared_ptr< GrpcClientConnection::Context > m_ctx;
grpc::Writer< testing_api::Request > m_request;
grpc::Reader< testing_api::Response > m_response;
};
std::unique_ptr< ServerStreamContext > ServerStream( AsyncHTTPRequestHeaders,
const testing_api::Request&,
TimeDifference timeout,
EventCallback& );
// testing_api.GrpcTest.AllStream
class AllStreamContext
{
public:
AllStreamContext( std::shared_ptr< GrpcClientConnection::Context >, const grpc::Settings& );
grpc::Writer< testing_api::Request >& GetRequest() { return m_request; }
void EndRequest() { m_ctx->requestSink.End(); }
grpc::Reader< testing_api::Response >& GetResponse() { return m_response; }
Potential< absl::Status > GetStatus() const { return *m_ctx->status.Lock(); }
private:
friend class GrpcTestGRPCClient;
std::shared_ptr< GrpcClientConnection::Context > m_ctx;
grpc::Writer< testing_api::Request > m_request;
grpc::Reader< testing_api::Response > m_response;
};
std::unique_ptr< AllStreamContext > AllStream( AsyncHTTPRequestHeaders,
TimeDifference timeout,
EventCallback& );
private:
GrpcClientConnection& m_connection;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment