Created
October 8, 2019 16:24
-
-
Save devendranaga/07be345518682e19331e00fa1aeac9ed to your computer and use it in GitHub Desktop.
Grand Central Dispatch in C++
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @description GCD in C++ | |
* | |
* @copyright 2019-present Devendra Naga (devnaga@tuta.io) All rights reserved | |
*/ | |
#include <iostream> | |
#include <memory> | |
#include <string> | |
#include <vector> | |
#include <thread> | |
#include <mutex> | |
#include <functional> | |
#include <queue> | |
#include <condition_variable> | |
#include <sys/socket.h> | |
#include <unistd.h> | |
#include <sys/types.h> | |
#include <sys/stat.h> | |
#include <signal.h> | |
#include <sys/timerfd.h> | |
#include <sys/signalfd.h> | |
// timer callback | |
typedef std::function<void(void)> Timer_Fn; | |
// socket callback | |
typedef std::function<void(int)> Socket_Fn; | |
// signal callback | |
typedef std::function<void(int)> Signal_Fn; | |
// job callback | |
typedef std::function<void(void)> Job_Fn; | |
// a process that is instantiated by the Parallel | |
class Parallel_Proc { | |
public: | |
explicit Parallel_Proc(); | |
~Parallel_Proc(); | |
void Queue_Work(Job_Fn job); | |
void Stop() | |
{ | |
std::unique_lock<std::mutex> lock(lock_); | |
stop_signal_ = true; | |
printf("Gcd: stop the process\n"); | |
cv_.notify_one(); | |
} | |
private: | |
void Process(); | |
std::unique_ptr<std::thread> proc_; | |
std::queue<Job_Fn> jobs_; | |
std::mutex lock_; | |
std::condition_variable cv_; | |
bool stop_signal_ = false; | |
}; | |
void Parallel_Proc::Queue_Work(Job_Fn job) | |
{ | |
std::unique_lock<std::mutex> lock(lock_); | |
jobs_.push(job); | |
cv_.notify_one(); | |
} | |
void Parallel_Proc::Process() | |
{ | |
sigset_t block; | |
// block termination signals ..temporarily | |
sigemptyset(&block); | |
sigaddset(&block, SIGINT); | |
sigprocmask(SIG_BLOCK, &block, 0); | |
while (1) { | |
Job_Fn job_to_do = nullptr; | |
{ | |
std::unique_lock<std::mutex> lock(lock_); | |
cv_.wait(lock); | |
if (stop_signal_) { | |
break; | |
} | |
job_to_do = jobs_.front(); | |
jobs_.pop(); | |
} | |
if (job_to_do) | |
job_to_do(); | |
} | |
} | |
Parallel_Proc::Parallel_Proc() | |
{ | |
proc_ = std::make_unique<std::thread>(&Parallel_Proc::Process, this); | |
proc_->detach(); | |
} | |
Parallel_Proc::~Parallel_Proc() | |
{ | |
} | |
class Parallel { | |
public: | |
explicit Parallel(); | |
~Parallel(); | |
void Queue_Work(Job_Fn job); | |
void StopAll() | |
{ | |
for (auto it : procs_) { | |
it->Stop(); | |
} | |
} | |
private: | |
std::vector<std::shared_ptr<Parallel_Proc>> procs_; | |
int n_threads_; | |
}; | |
// queue work on a random thread | |
void Parallel::Queue_Work(Job_Fn job) | |
{ | |
int thread_id = rand() % n_threads_; | |
procs_[thread_id]->Queue_Work(job); | |
} | |
Parallel::Parallel() | |
{ | |
// get number of threads | |
n_threads_ = std::thread::hardware_concurrency(); | |
printf("Gcd: create [%d] threads for work\n", n_threads_); | |
for (int i = 0; i < std::thread::hardware_concurrency(); i ++) { | |
std::shared_ptr<Parallel_Proc> pp = std::make_shared<Parallel_Proc>(); | |
procs_.push_back(pp); | |
} | |
struct timespec ts; | |
clock_gettime(CLOCK_REALTIME, &ts); | |
srand(ts.tv_nsec * 1000); | |
} | |
Parallel::~Parallel() | |
{ | |
} | |
struct Gcd_Timer { | |
int sec_; | |
int usec_; | |
int fd_; | |
Timer_Fn timer_fn_; | |
}; | |
struct Gcd_Socket { | |
int fd_; | |
Socket_Fn socket_fn_; | |
}; | |
struct Gcd_Signal { | |
int sig; | |
std::vector<Signal_Fn> signal_fn_; | |
}; | |
// Grand Central Dispatch main class | |
class Gcd { | |
public: | |
static Gcd *Instance() | |
{ | |
static Gcd gcd; | |
return &gcd; | |
} | |
~Gcd(); | |
// create timer event with sec, usec and a callback | |
int Create_Timer_Event(int sec, int usec, Timer_Fn timer_fn) noexcept; | |
// create socket event with fd and a callback | |
int Create_Socket_Event(int fd, Socket_Fn socket_fn) noexcept; | |
// create signal event with signal no and a callback | |
int Create_Signal_Event(int sig, Signal_Fn signal_fn) noexcept; | |
// run an execution context | |
void Run_Execution(Job_Fn job) noexcept; | |
// run the main GCD | |
void Start(); | |
// terminate the wheel | |
void Terminate() { terminate_ = true; } | |
private: | |
// list of timers | |
std::vector<Gcd_Timer> timers_; | |
// list of sockets | |
std::vector<Gcd_Socket> sockets_; | |
// list of signals | |
std::vector<Gcd_Signal> signals_; | |
// parallel context | |
Parallel p_; | |
// set to true when Terminate() is called | |
bool terminate_ = false; | |
// fd to handle the signals | |
int signal_fd_; | |
// maks of all the signals | |
sigset_t signal_masks_; | |
// maks of all the fds | |
fd_set allfd_; | |
// get allmax fds | |
int get_max_fd_(); | |
// priovate constructor .. base singleton | |
explicit Gcd(); | |
}; | |
Gcd::Gcd() | |
{ | |
FD_ZERO(&allfd_); | |
sigemptyset(&signal_masks_); | |
signal_fd_ = signalfd(-1, &signal_masks_, 0); | |
if (signal_fd_ < 0) { | |
throw std::system_error(errno, std::generic_category(), "failed to signalfd"); | |
} | |
FD_SET(signal_fd_, &allfd_); | |
} | |
Gcd::~Gcd() | |
{ | |
for (auto it : timers_) { | |
FD_CLR(it.fd_, &allfd_); | |
} | |
for (auto it : sockets_) { | |
FD_CLR(it.fd_, &allfd_); | |
} | |
p_.StopAll(); | |
} | |
void Gcd::Start() | |
{ | |
int ret; | |
for (;;) { | |
int maxfd = get_max_fd_(); | |
fd_set rdfd = allfd_; | |
ret = select(maxfd + 1, &rdfd, nullptr, nullptr, nullptr); | |
if (terminate_) { | |
break; | |
} | |
if (ret < 0) { | |
break; | |
} else { | |
if (FD_ISSET(signal_fd_, &rdfd)) { | |
struct signalfd_siginfo siginfo; | |
ret = read(signal_fd_, &siginfo, sizeof(siginfo)); | |
if (ret < 0) { | |
break; | |
} | |
for (auto it : signals_) { | |
if (it.sig == siginfo.ssi_signo) { | |
for (auto callbacks : it.signal_fn_) { | |
callbacks(it.sig); | |
} | |
break; | |
} | |
} | |
} | |
for (auto it : timers_) { | |
uint64_t val; | |
if (FD_ISSET(it.fd_, &rdfd)) { | |
ret = read(it.fd_, &val, sizeof(val)); | |
if (ret < 0) { | |
break; | |
} | |
it.timer_fn_(); | |
break; | |
} | |
} | |
for (auto it : sockets_) { | |
if (FD_ISSET(it.fd_, &rdfd)) { | |
it.socket_fn_(it.fd_); | |
break; | |
} | |
} | |
} | |
} | |
} | |
int Gcd::get_max_fd_() | |
{ | |
int maxfd = 0; | |
for (auto it : timers_) { | |
if (it.fd_ > maxfd) | |
maxfd = it.fd_; | |
} | |
for (auto it : sockets_) { | |
if (it.fd_ > maxfd) | |
maxfd = it.fd_; | |
} | |
if (signal_fd_ > maxfd) | |
maxfd = signal_fd_; | |
return maxfd; | |
} | |
int Gcd::Create_Timer_Event(int sec, int usec, Timer_Fn timer_fn) noexcept | |
{ | |
Gcd_Timer timer; | |
timer.sec_ = sec; | |
timer.usec_ = usec; | |
timer.fd_ = timerfd_create(CLOCK_MONOTONIC, 0); | |
if (timer.fd_ < 0) { | |
return -1; | |
} | |
timer.timer_fn_ = timer_fn; | |
struct itimerspec ts; | |
ts.it_value.tv_sec = sec; | |
ts.it_value.tv_nsec = usec * 1000ULL; | |
ts.it_interval.tv_sec = sec; | |
ts.it_interval.tv_nsec = usec * 1000ULL; | |
auto ret = timerfd_settime(timer.fd_, 0, &ts, nullptr); | |
if (ret < 0) { | |
close(timer.fd_); | |
return -1; | |
} | |
FD_SET(timer.fd_, &allfd_); | |
timers_.push_back(timer); | |
return 0; | |
} | |
int Gcd::Create_Socket_Event(int fd, Socket_Fn socket_fn) noexcept | |
{ | |
Gcd_Socket sock; | |
sock.fd_ = fd; | |
sock.socket_fn_ = socket_fn; | |
FD_SET(sock.fd_, &allfd_); | |
sockets_.push_back(sock); | |
return 0; | |
} | |
int Gcd::Create_Signal_Event(int sig, Signal_Fn signal_fn) noexcept | |
{ | |
Gcd_Signal ev; | |
std::vector<Gcd_Signal>::iterator it1; | |
for (it1 = signals_.begin(); it1 != signals_.end(); it1 ++) { | |
if (it1->sig == sig) { | |
it1->signal_fn_.push_back(signal_fn); | |
return 0; | |
} | |
} | |
sigaddset(&signal_masks_, sig); | |
FD_CLR(signal_fd_, &allfd_); | |
signal_fd_ = signalfd(signal_fd_, &signal_masks_, 0); | |
if (signal_fd_ < 0) { | |
return -1; | |
} | |
sigprocmask(SIG_BLOCK, &signal_masks_, nullptr); | |
FD_SET(signal_fd_, &allfd_); | |
ev.sig = sig; | |
ev.signal_fn_.push_back(signal_fn); | |
signals_.push_back(ev); | |
return 0; | |
} | |
void Gcd::Run_Execution(Job_Fn job) noexcept | |
{ | |
p_.Queue_Work(job); | |
} | |
/*** | |
* | |
* | |
* | |
* T E S T C O D E | |
* | |
* | |
*/ | |
void execution_point() | |
{ | |
printf("An execution point\n"); | |
} | |
void timer_function() | |
{ | |
struct timespec ts; | |
clock_gettime(CLOCK_REALTIME, &ts); | |
printf("%ld.%ld\n", ts.tv_sec, ts.tv_nsec); | |
Gcd::Instance()->Run_Execution(&execution_point); | |
} | |
void socket_function(int fd) | |
{ | |
char data[3000]; | |
fgets(data, sizeof(data), stdin); | |
printf("data: %s\n", data); | |
} | |
void signal_function(int sig) | |
{ | |
printf("signal %d occured\n", sig); | |
Gcd::Instance()->Terminate(); | |
} | |
int main() | |
{ | |
auto fn = std::bind(&timer_function); | |
auto sock_fn = std::bind(&socket_function, std::placeholders::_1); | |
auto sig_fn = std::bind(&signal_function, std::placeholders::_1); | |
// register timer | |
Gcd::Instance()->Create_Timer_Event(1, 0, fn); | |
// register fd event for 0 (aka stdin) | |
Gcd::Instance()->Create_Socket_Event(0, sock_fn); | |
// resiter for SIGINT | |
Gcd::Instance()->Create_Signal_Event(SIGINT, sig_fn); | |
// start the gcd | |
Gcd::Instance()->Start(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment