Skip to content

Instantly share code, notes, and snippets.

@Shehab7osny
Last active October 9, 2021 23:31
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Shehab7osny/68f08397d8894582560077b174a0dce9 to your computer and use it in GitHub Desktop.
Save Shehab7osny/68f08397d8894582560077b174a0dce9 to your computer and use it in GitHub Desktop.
Details about my progress throughout GSoC 2020

GSoC 2020 - gRPC R package

Shehab Hosny

Overview and Aim of the project

In this post I will try to wrap up my working progress throught GSoC 2020. I've been working since May trying to add new functionalities to the gRPC R package originally implemneted by Neal Fultz in 2017. There were a total of three functionalities to implement, TLS Encryption, Oauth2 Authentication and finally Streaming. It was required to implement these functionalities in the grpc package from both R and the underlying C wrapper code. Here is the link for my forked repository.

Challenges

Obviously, one of the main challenges that faced me while working n the project was the callback between the R code and the underlying C wrapper code. This required me to go more deep in the callback functions' implementation and returned data types. For the streaming part, this problem consumed a vast amount of time to be solved in a correct way. Additionaly, knowing exactly why each message and acknowledgment was sent and when was another challenge that required deep understanding of the base code in order to implement the streaming functionality without breaking the original code.

So, lets now strat discussing how I actually implemented TLS Encrytion to messages sent by both the client and the server. Image1

1. TLS Encryption

  1. Overview:
    TLS is a security protocol used to provide end-to-end security of data sent between clients and servers. This is vital to ensure that eavesdroppers and hackers are unable to grab sensitive information such as passwords, credit card numbers, and personal correspondence sent over the gRPC connection.

    The following diagram illustrates the TLS handshake process: Image2

  2. Steps to secure the gRPC connection with TLS encryption:

    1. Generate the TLS certificates using the following script:
      rm *.pem
      
      # 1. Generate CA's private key and self-signed certificate
      openssl req -x509 -newkey rsa:4096 -days 365 -nodes -keyout ca-key.pem -out ca-cert.pem -subj
      "/C=EG/ST=CairoEG/L=Cairo/O=GSoC/OU=CerAuth/CN=Cert/emailAddress=ca@gmail.com"
      
      # 2. Generate web server's private key and certificate signing request (CSR)
      openssl req -newkey rsa:4096 -nodes -keyout server-key.pem -out server-req.pem -subj
      "/C=EG/ST=AlexEG/L=Alex/O=GSoC/OU=Server/CN=Server/emailAddress=server@gmail.com"
      
      # 3. Use CA's private key to sign web server's CSR and get back the signed certificate
      openssl x509 -req -in server-req.pem -days 60 -CA ca-cert.pem -CAkey ca-key.pem 
      -CAcreateserial -out server-cert.pem -extfile server-ext.cnf
      
      # 4. Generate client's private key and certificate signing request (CSR)
      openssl req -newkey rsa:4096 -nodes -keyout client-key.pem -out client-req.pem -subj
      "/C=FR/ST=Alsace/L=Strasbourg/O=PC Client/OU=Computer/CN=Client/emailAddress=pcclient@gmail.com"
      
      # 5. Use CA's private key to sign client's CSR and get back the signed certificate
      openssl x509 -req -in client-req.pem -days 60 -CA ca-cert.pem -CAkey ca-key.pem 
      -CAcreateserial -out client-cert.pem
    2. For the client-side and server-side, set the following options to an already impelemented server and client:
      # Enable the TLS encryption
      options(UseTLS = TRUE)
      # Provide the folder containing the TLS certificates
      options(CertPath = "/home/gRPC/Certificates/")
    3. In case of invalid certificates or private key, the following message will pop up:
      Invalid private key.
      Handshaker factory creation failed with TSI_INVALID_ARGUMENT.
      Failed to create secure subchannel for secure name 'localhost:50051'
      Failed to create channel args during subchannel creation.
      Error in fetch(channel, fn$name, serialize(x, NULL), metadata, UseTLS,  : 
        No response from the gRPC server
      Calls: demo ... eval -> eval -> <Anonymous> -> read -> fetch -> .Call
      Execution halted
Technical Implementation Details

client.cpp file updates:

The createChannel() function is used to create either a secure or insecure channel based on the boolean variable useTLS set by the user.

grpc_channel* channel = 
createChannel(useTLS, server[0], certPath[0], tokenValue[0], isMetadataAttached);
	  ```
Here is a detailed implementation for the **createChannel()** function.
```cpp
grpc_channel* createChannel(bool useTLS, const char* server ,const char* path, 
                            const char* tokenValue, bool isMetadataAttached) {
  
  grpc_channel* channel;

  if(useTLS) {
    grpc_channel_credentials* client_creds = 
      getClientCredentials(path, tokenValue, isMetadataAttached);
    channel = 
      grpc_secure_channel_create(client_creds, server, NULL, nullptr);
  }
  
  else {
    channel = 
      grpc_insecure_channel_create(server, NULL, nullptr);
  }

  return channel;
}

The getClientCredentials() function is used to generate the client credentials based on the TLS certificates provided by the user.

grpc_channel_credentials* getClientCredentials(const char* path, const char* tokenValue, 
                                               bool isMetadataAttached){

  files PEMfiles;

  PEMfiles.CAcert = 
    get_file_contents(((std::string)path + "ca-cert.pem").c_str());
  PEMfiles.clientKey = 
    get_file_contents(((std::string)path + "client-key.pem").c_str());
  PEMfiles.clientCert = 
    get_file_contents(((std::string)path + "client-cert.pem").c_str());
  
  grpc_ssl_pem_key_cert_pair signed_client_key_cert_pair =
    {(PEMfiles.clientKey).c_str(), (PEMfiles.clientCert).c_str()};

  grpc_channel_credentials* creds = grpc_ssl_credentials_create(
    (PEMfiles.CAcert).c_str(), &signed_client_key_cert_pair, nullptr, nullptr);

  if(isMetadataAttached)
    return creds;

  grpc_call_credentials* oauth2Creds = 
    grpc_access_token_credentials_create(getOauth2AccessToken(tokenValue), nullptr);
  
  grpc_channel_credentials* credsTLSOauth =
    grpc_composite_channel_credentials_create(creds, oauth2Creds, nullptr);
    
  return credsTLSOauth;
}

The get_file_contents() function is used to read the TLS certificates files.

static std::string get_file_contents(const char *fpath) {
  std::ifstream finstream(fpath);

  std::string contents(
	  (std::istreambuf_iterator<char>(finstream)),
	  std::istreambuf_iterator<char>()
	  );

  return contents;
}

server.cpp file updates:

The createPort() function is used to create either a secure or insecure port based on the boolean variable useTLS set by the user.

RGRPC_LOG("Bind");
int port = 
  createPort(useTLS, server, hoststring[0], CertPath[0]);
params["port"] = port;
runFunctionIfProvided(hooks, "bind", params);

Here is a detailed implementation for the createPort() function.

int createPort(bool useTLS, grpc_server* server, const char* hoststring, const char* CertPath) {

  int port;

  if(useTLS) {
    grpc_server_credentials* credentials = Get_TLS_Credentials(CertPath);
    port = grpc_server_add_secure_http2_port(server, hoststring, credentials);
    grpc_server_credentials_release(credentials);
  }
  
  else {
    port = grpc_server_add_insecure_http2_port(server, hoststring);
  }

  return port;
}

The Get_TLS_Credentials() function is used to generate the server credentials based on the TLS certificates provided by the user.

grpc_server_credentials* Get_TLS_Credentials(const char* path) {

  std::string ca_cert_pem = 
    get_file_contents(((std::string)path + "ca-cert.pem").c_str());
  std::string server_key_pem = 
    get_file_contents(((std::string)path + "server-key.pem").c_str());
  std::string server_cert_pem = 
    get_file_contents(((std::string)path + "server-cert.pem").c_str());

  grpc_ssl_pem_key_cert_pair pem_cert_key_pair =
    {server_key_pem.c_str(), server_cert_pem.c_str()};

  grpc_server_credentials* Creds =
    grpc_ssl_server_credentials_create_ex(
      ca_cert_pem.c_str(),
      &pem_cert_key_pair,
      1,
      GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY,
      nullptr);

  return Creds;
}

2. Oauth2 Authentication

  1. Overview:
    OAuth is an open-standard authorization protocol which lets a service use another service without requiring the security details (username, password, etc.) of the user.

    Steps of the authentication process:

    1. Access Token is given to the client application in advance. This token contains the permissions to access some of the server's secured data.
    2. The client application presents the access token when it requests the user's data.
    3. The server extracts the access token that is included in the request and confirms that the Access Token denotes that the client has permissions to access the server's secured data.
    4. After the confirmation, the server passes the secured data to the client.
  2. Steps to authenticate the gRPC connection with Oauth2:

    1. In order to authenticate the connection, the TLS encryption must be enabled at first as illustrated above.
    2. For the client-side and server-side, set the Access Token as follows:
      # Set the Access Token
      options(AccessToken = "Bearer Token Shehab and Neal")
Technical Implementation Details

client.cpp file updates:

The getOauth2AccessToken() function is used to return the access token set by the user.

const char* getOauth2AccessToken(const char* tokenValue) {
  
  if(tokenValue == nullptr)
    stop("Access token value is not defined");
  
  const char* accessToken(tokenValue + sizeof("Bearer ") - 1);
    
  return accessToken;
}

server.cpp file updates:

The validateOauth2() function is used to check the validate the Oauth2 token.

bool validateOauth2(grpc_metadata_array metadataArray, Function checkAuthCallback) {
  
  Authentication_Token Oauth2;
  
  int Index = metadataArray.count - 2;
  
  if (metadataArray.count > 1) {
  
    Oauth2.Value = 
      (std::string)grpc_slice_to_c_string(metadataArray.metadata[Index].value);
  
    Oauth2.Key = 
      (std::string)grpc_slice_to_c_string(metadataArray.metadata[Index].key);
  
    bool Check = Rcpp::as<bool>(checkAuthCallback(Oauth2.Value));
  
    if(!Check)
      return false;
  }
  
  return true;
}

3. Streaming

  1. Overview:
    gRPC supports streaming connection, where either the client or the server (or both) send a stream of messages on a single RPC call. When the server sends a stream of messages in response for a single client message, this is called Server-side streaming. However, when the client sends a stream of messages a waits for a single server message, this is called Client-side streaming. The most general case is Bidirectional Streaming where a single gRPC call establishes a stream in which both the client and the server can send a stream of messages to each other. The streamed messages are delivered in the order they were sent. In this section, the Server-side streaming will be discussed.

    Steps of the server-side streaming process:

    1. The client sends a single message to the server.
    2. The server receives the messages and starts operating on it.
    3. The server sends multiple responses to the client.
      In this step, the server-side uses some sort of mutex lock to prevent sending the next message unless the client-side confirms that it receives the current message. This will described in details in the Technical Implementation section.
  2. Steps to use server-side streaming:

    1. In the proto file, add a new rpc with the keyword stream before the response type as follows:
      rpc SayEverything (GreetingRequest) returns (stream GreetingReply);
    2. In the client-side, the server-side responses will be accessed on a time a stored in a list.
      The user will have the ability to either operate on each response at a time or operate on the list of responses as a whole.

      Operate on each response independently in the client.R file:
      while(1) {
      message <- read(
      	ResponseDescriptor, 
      	fetch(channel, fn$name, serialize(x, NULL), metadata, UseTLS, CertPath, AccessToken, ClientDeadline)
       	)
      
      # Operate on each response right here
      
      if(as.list(message) == '')
      	break;
      				
      List_messages[[i]] <- message
      	i <- i + 1
      }		

      Operate on the list of responses as a whole in the helloclient.R file:
      everything <- client$SayEverything$build(name='Shehab')
      queue <- client$SayEverything$call(everything)
      
      for (message in queue) {
      	# Operate on each response right here
      }
    3. In the server-side, the user will be able to add as much responses as he requires using the newResponse() R function as follows:
      impl$SayEverything$f <- function(request){
      	newResponse(message = paste('Your name is ', request$name))
      	newResponse(message = paste('Hello,', request$name))
      	newResponse(message = paste('I hope you are enjoying your day,', request$name))
      	newResponse(message = paste('Thanks,', request$name))
      	newResponse(message = paste('Have a great day,', request$name))
      	newResponse(message = paste('Bye,', request$name))
      }
Technical Implementation Details

client.R file updates:

client_functions <- lapply(impl, 
function(fn) {
  RequestDescriptor   <- P(fn[["RequestType"]]$proto)
  ResponseDescriptor  <- P(fn[["ResponseType"]]$proto)
	stream_bool <- fn[["ResponseType"]]$stream
	message <- list(
	call = function(x, metadata=character(0)) 
  	{
  	  if(stream_bool) {
  		i <- 1
  		List_messages <- list()
  		while(1) {
  		  message <- read(
  			ResponseDescriptor, 
  			fetch(channel, fn$name, serialize(x, NULL), metadata, UseTLS, CertPath, AccessToken,
  			ClientDeadline)
  			)
  						  
  		  if(as.list(message) == '')
  		    break;
  						
  			List_messages[[i]] <- message
  			  i <- i + 1
  			}
  		}
  
  	  else {
  	    List_messages <- read(
  	    ResponseDescriptor, 
  	    fetch(channel, fn$name, serialize(x, NULL), metadata, UseTLS, CertPath, AccessToken,
  			ClientDeadline)
  			)
		}
  				
		List_messages
  },
              
  build = function(...) {
    new(RequestDescriptor, ...)
  }
  )
  message
}
)

server.cpp file updates:

// [[Rcpp::export]]
List streamMessage(RawVector response) {
  
  grpc_op ops[6];
  grpc_op *op;
  int was_cancelled = 2;
      
  if(messageCount > 1) {
      //Client Mutex starts here
      do {
          memset(ops, 0, sizeof(ops));
  
          grpc_server_request_call(*Server_Global, Call_Global,
                           Details_Global, Metadata_Global, *Queue_Global, *Queue_Global, NULL);
  
          gpr_timespec c_increment = 
              gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN);
  
          gpr_timespec c_timeout = 
              gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment);
  
          *Event_Global = grpc_completion_queue_next(*Queue_Global, c_timeout, RESERVED);
  
      } while((*Event_Global).type != GRPC_OP_COMPLETE);
      //Client Mutex ends here
 
      memset(ops, 0, sizeof(ops));
      op = ops;
  
      RGRPC_LOG("GRPC_OP_SEND_INITIAL_METADATA");
      op -> op = GRPC_OP_SEND_INITIAL_METADATA;
      op -> data.send_initial_metadata.count = 0;
      op -> data.send_initial_metadata.maybe_compression_level.is_set = false;
      op -> flags = 0;
      op -> reserved = NULL;
      op++;
  
      RGRPC_LOG("GRPC_OP_RECV_MESSAGE");
      op -> op = GRPC_OP_RECV_MESSAGE;
      op -> data.recv_message.recv_message = Request_Global;
      op -> flags = 0;
      op -> reserved = NULL;
      op++;
  
      grpc_call_start_batch(*Call_Global, ops, (size_t)(op - ops), NULL, NULL);
      grpc_completion_queue_next(*Queue_Global, *Timespec_Global, RESERVED);
  }
  
  messageCount = messageCount + 1;

  char const * status_details_string = "OK";
  
  int len = response.length();
  SEXP raw_ = response;
  grpc_slice response_payload_slice = grpc_slice_from_copied_buffer((char*) RAW(raw_), len);
  grpc_byte_buffer* response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1);
  grpc_slice_unref(response_payload_slice);
 
  memset(ops, 0, sizeof(ops));
  op = ops;
    
  RGRPC_LOG("GRPC_OP_RECV_CLOSE_ON_SERVER");
  op -> op = GRPC_OP_RECV_CLOSE_ON_SERVER;
  op -> data.recv_close_on_server.cancelled = &was_cancelled;
  op -> flags = 0;
  op -> reserved = NULL;
  op++;
    
  RGRPC_LOG("GRPC_OP_SEND_MESSAGE");
  op -> op = GRPC_OP_SEND_MESSAGE;
  op -> data.send_message.send_message = response_payload;
  op -> flags = 0;
  op -> reserved = NULL;
  op++;
    
  RGRPC_LOG("GRPC_OP_SEND_STATUS_FROM_SERVER 1");
  op -> op = GRPC_OP_SEND_STATUS_FROM_SERVER;
  op -> data.send_status_from_server.trailing_metadata_count = 0;
   
  op -> data.send_status_from_server.status = *Status_Code_Global;
  grpc_slice status_details = grpc_slice_from_static_string(status_details_string);
  
  op -> data.send_status_from_server.status_details = &status_details;
  op -> flags = 0;
  op -> reserved = NULL;
  op++;
    
  RGRPC_LOG("Starts the process of Streaming!");
  grpc_call_start_batch(*Call_Global, ops, (size_t)(op - ops), NULL, NULL);
  *Event_Global = grpc_completion_queue_next(*Queue_Global, *Timespec_Global, RESERVED);
    
  grpc_byte_buffer_destroy(response_payload);
  
  return List::create();
}

Commits List

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment