Last active
May 29, 2018 03:10
-
-
Save typhoonzero/fc2489231c8e3a29d9ac7c895c8f4aae to your computer and use it in GitHub Desktop.
An example of how to abstract fluid rpc server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <string> | |
#include <condition> | |
#include <atomic> | |
class RPCServer; | |
/* | |
* Handler Class for the server to use | |
*/ | |
class RequestHandler { | |
public: | |
RequestHandler(bool sync_mode); | |
// Handle must be thread-safe so that all | |
// handle calls can be run in a thread pool. | |
virtual bool Handle() = 0; | |
protected: | |
// handler context used for processing | |
framework::Scope scope_; | |
platform::DeviceContext dev_ctx_; | |
framework::Executor exe_; | |
framework::Program update_program_; | |
framework::Program prefetch_program_; | |
RPCServer* rpc_server; | |
bool sync_mode_; | |
}; | |
/* | |
* Base interface for Fluid RPC communication calls | |
*/ | |
class RPCServer { | |
public: | |
RPCServer(const std::string& bind_address, | |
int client_num, | |
int thread_pool_size); | |
virtual ~RPCServer(); | |
virtual StartServer() = 0; | |
virtual WaitServerReady() = 0; | |
virtual Shutdown() = 0; | |
virtual int GetSelectedPort(); | |
void SavePortFile(); | |
// RunEventLoop receives a client RPC request | |
// and Process the request using registered | |
// handlers in a thread pool. | |
virtual void RunEventLoop() = 0; | |
public: | |
// RegisterRPC, register the rpc method name to a handler | |
// class, and auto generate a condition id for this call | |
// to be used for the barrier (rpc_call_map_ and rpc_cond_map | |
// will be updated). | |
void RegisterRPC(const std::string& rpc_name, | |
RequestHandler* handler); | |
// Wait util all the client have reached the barrier for one | |
// rpc method. This function should be called in the | |
// RequestHandler if you want to run the server/client in a | |
// synchronous mode. | |
void WaitBarrier(const std::string& rpc_name); | |
private: | |
void ResetBarrierCounter(); | |
private: | |
std::unordered_map<std::string, int> barrier_counter_; | |
protected: | |
std::unordered_map<std::string, std::function> rpc_call_map_; | |
std::unordered_map<std::string, int> rpc_cond_map_; | |
std::condition_variable barrier_cond_; | |
int thread_pool_size_; | |
int client_num_; | |
atomic<int> exit_flag_; | |
framework::ThreadPool rpcserver_thread_pool_; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment