Skip to content

Instantly share code, notes, and snippets.

@typhoonzero
Last active May 29, 2018 03:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save typhoonzero/fc2489231c8e3a29d9ac7c895c8f4aae to your computer and use it in GitHub Desktop.
Save typhoonzero/fc2489231c8e3a29d9ac7c895c8f4aae to your computer and use it in GitHub Desktop.
An example of how to abstract fluid rpc server
#include <string>
#include <condition>
#include <atomic>
class RPCServer;
/*
* Handler Class for the server to use
*/
class RequestHandler {
public:
RequestHandler(bool sync_mode);
// Handle must be thread-safe so that all
// handle calls can be run in a thread pool.
virtual bool Handle() = 0;
protected:
// handler context used for processing
framework::Scope scope_;
platform::DeviceContext dev_ctx_;
framework::Executor exe_;
framework::Program update_program_;
framework::Program prefetch_program_;
RPCServer* rpc_server;
bool sync_mode_;
};
/*
* Base interface for Fluid RPC communication calls
*/
class RPCServer {
public:
RPCServer(const std::string& bind_address,
int client_num,
int thread_pool_size);
virtual ~RPCServer();
virtual StartServer() = 0;
virtual WaitServerReady() = 0;
virtual Shutdown() = 0;
virtual int GetSelectedPort();
void SavePortFile();
// RunEventLoop receives a client RPC request
// and Process the request using registered
// handlers in a thread pool.
virtual void RunEventLoop() = 0;
public:
// RegisterRPC, register the rpc method name to a handler
// class, and auto generate a condition id for this call
// to be used for the barrier (rpc_call_map_ and rpc_cond_map
// will be updated).
void RegisterRPC(const std::string& rpc_name,
RequestHandler* handler);
// Wait util all the client have reached the barrier for one
// rpc method. This function should be called in the
// RequestHandler if you want to run the server/client in a
// synchronous mode.
void WaitBarrier(const std::string& rpc_name);
private:
void ResetBarrierCounter();
private:
std::unordered_map<std::string, int> barrier_counter_;
protected:
std::unordered_map<std::string, std::function> rpc_call_map_;
std::unordered_map<std::string, int> rpc_cond_map_;
std::condition_variable barrier_cond_;
int thread_pool_size_;
int client_num_;
atomic<int> exit_flag_;
framework::ThreadPool rpcserver_thread_pool_;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment