Skip to content

Instantly share code, notes, and snippets.

@devendranaga
Created October 8, 2019 16:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devendranaga/07be345518682e19331e00fa1aeac9ed to your computer and use it in GitHub Desktop.
Save devendranaga/07be345518682e19331e00fa1aeac9ed to your computer and use it in GitHub Desktop.
Grand Central Dispatch in C++
/**
* @description GCD in C++
*
* @copyright 2019-present Devendra Naga (devnaga@tuta.io) All rights reserved
*/
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include <thread>
#include <mutex>
#include <functional>
#include <queue>
#include <condition_variable>
#include <sys/socket.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <signal.h>
#include <sys/timerfd.h>
#include <sys/signalfd.h>
// timer callback
typedef std::function<void(void)> Timer_Fn;
// socket callback
typedef std::function<void(int)> Socket_Fn;
// signal callback
typedef std::function<void(int)> Signal_Fn;
// job callback
typedef std::function<void(void)> Job_Fn;
// a process that is instantiated by the Parallel
class Parallel_Proc {
public:
explicit Parallel_Proc();
~Parallel_Proc();
void Queue_Work(Job_Fn job);
void Stop()
{
std::unique_lock<std::mutex> lock(lock_);
stop_signal_ = true;
printf("Gcd: stop the process\n");
cv_.notify_one();
}
private:
void Process();
std::unique_ptr<std::thread> proc_;
std::queue<Job_Fn> jobs_;
std::mutex lock_;
std::condition_variable cv_;
bool stop_signal_ = false;
};
void Parallel_Proc::Queue_Work(Job_Fn job)
{
std::unique_lock<std::mutex> lock(lock_);
jobs_.push(job);
cv_.notify_one();
}
void Parallel_Proc::Process()
{
sigset_t block;
// block termination signals ..temporarily
sigemptyset(&block);
sigaddset(&block, SIGINT);
sigprocmask(SIG_BLOCK, &block, 0);
while (1) {
Job_Fn job_to_do = nullptr;
{
std::unique_lock<std::mutex> lock(lock_);
cv_.wait(lock);
if (stop_signal_) {
break;
}
job_to_do = jobs_.front();
jobs_.pop();
}
if (job_to_do)
job_to_do();
}
}
Parallel_Proc::Parallel_Proc()
{
proc_ = std::make_unique<std::thread>(&Parallel_Proc::Process, this);
proc_->detach();
}
Parallel_Proc::~Parallel_Proc()
{
}
class Parallel {
public:
explicit Parallel();
~Parallel();
void Queue_Work(Job_Fn job);
void StopAll()
{
for (auto it : procs_) {
it->Stop();
}
}
private:
std::vector<std::shared_ptr<Parallel_Proc>> procs_;
int n_threads_;
};
// queue work on a random thread
void Parallel::Queue_Work(Job_Fn job)
{
int thread_id = rand() % n_threads_;
procs_[thread_id]->Queue_Work(job);
}
Parallel::Parallel()
{
// get number of threads
n_threads_ = std::thread::hardware_concurrency();
printf("Gcd: create [%d] threads for work\n", n_threads_);
for (int i = 0; i < std::thread::hardware_concurrency(); i ++) {
std::shared_ptr<Parallel_Proc> pp = std::make_shared<Parallel_Proc>();
procs_.push_back(pp);
}
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
srand(ts.tv_nsec * 1000);
}
Parallel::~Parallel()
{
}
struct Gcd_Timer {
int sec_;
int usec_;
int fd_;
Timer_Fn timer_fn_;
};
struct Gcd_Socket {
int fd_;
Socket_Fn socket_fn_;
};
struct Gcd_Signal {
int sig;
std::vector<Signal_Fn> signal_fn_;
};
// Grand Central Dispatch main class
class Gcd {
public:
static Gcd *Instance()
{
static Gcd gcd;
return &gcd;
}
~Gcd();
// create timer event with sec, usec and a callback
int Create_Timer_Event(int sec, int usec, Timer_Fn timer_fn) noexcept;
// create socket event with fd and a callback
int Create_Socket_Event(int fd, Socket_Fn socket_fn) noexcept;
// create signal event with signal no and a callback
int Create_Signal_Event(int sig, Signal_Fn signal_fn) noexcept;
// run an execution context
void Run_Execution(Job_Fn job) noexcept;
// run the main GCD
void Start();
// terminate the wheel
void Terminate() { terminate_ = true; }
private:
// list of timers
std::vector<Gcd_Timer> timers_;
// list of sockets
std::vector<Gcd_Socket> sockets_;
// list of signals
std::vector<Gcd_Signal> signals_;
// parallel context
Parallel p_;
// set to true when Terminate() is called
bool terminate_ = false;
// fd to handle the signals
int signal_fd_;
// maks of all the signals
sigset_t signal_masks_;
// maks of all the fds
fd_set allfd_;
// get allmax fds
int get_max_fd_();
// priovate constructor .. base singleton
explicit Gcd();
};
Gcd::Gcd()
{
FD_ZERO(&allfd_);
sigemptyset(&signal_masks_);
signal_fd_ = signalfd(-1, &signal_masks_, 0);
if (signal_fd_ < 0) {
throw std::system_error(errno, std::generic_category(), "failed to signalfd");
}
FD_SET(signal_fd_, &allfd_);
}
Gcd::~Gcd()
{
for (auto it : timers_) {
FD_CLR(it.fd_, &allfd_);
}
for (auto it : sockets_) {
FD_CLR(it.fd_, &allfd_);
}
p_.StopAll();
}
void Gcd::Start()
{
int ret;
for (;;) {
int maxfd = get_max_fd_();
fd_set rdfd = allfd_;
ret = select(maxfd + 1, &rdfd, nullptr, nullptr, nullptr);
if (terminate_) {
break;
}
if (ret < 0) {
break;
} else {
if (FD_ISSET(signal_fd_, &rdfd)) {
struct signalfd_siginfo siginfo;
ret = read(signal_fd_, &siginfo, sizeof(siginfo));
if (ret < 0) {
break;
}
for (auto it : signals_) {
if (it.sig == siginfo.ssi_signo) {
for (auto callbacks : it.signal_fn_) {
callbacks(it.sig);
}
break;
}
}
}
for (auto it : timers_) {
uint64_t val;
if (FD_ISSET(it.fd_, &rdfd)) {
ret = read(it.fd_, &val, sizeof(val));
if (ret < 0) {
break;
}
it.timer_fn_();
break;
}
}
for (auto it : sockets_) {
if (FD_ISSET(it.fd_, &rdfd)) {
it.socket_fn_(it.fd_);
break;
}
}
}
}
}
int Gcd::get_max_fd_()
{
int maxfd = 0;
for (auto it : timers_) {
if (it.fd_ > maxfd)
maxfd = it.fd_;
}
for (auto it : sockets_) {
if (it.fd_ > maxfd)
maxfd = it.fd_;
}
if (signal_fd_ > maxfd)
maxfd = signal_fd_;
return maxfd;
}
int Gcd::Create_Timer_Event(int sec, int usec, Timer_Fn timer_fn) noexcept
{
Gcd_Timer timer;
timer.sec_ = sec;
timer.usec_ = usec;
timer.fd_ = timerfd_create(CLOCK_MONOTONIC, 0);
if (timer.fd_ < 0) {
return -1;
}
timer.timer_fn_ = timer_fn;
struct itimerspec ts;
ts.it_value.tv_sec = sec;
ts.it_value.tv_nsec = usec * 1000ULL;
ts.it_interval.tv_sec = sec;
ts.it_interval.tv_nsec = usec * 1000ULL;
auto ret = timerfd_settime(timer.fd_, 0, &ts, nullptr);
if (ret < 0) {
close(timer.fd_);
return -1;
}
FD_SET(timer.fd_, &allfd_);
timers_.push_back(timer);
return 0;
}
int Gcd::Create_Socket_Event(int fd, Socket_Fn socket_fn) noexcept
{
Gcd_Socket sock;
sock.fd_ = fd;
sock.socket_fn_ = socket_fn;
FD_SET(sock.fd_, &allfd_);
sockets_.push_back(sock);
return 0;
}
int Gcd::Create_Signal_Event(int sig, Signal_Fn signal_fn) noexcept
{
Gcd_Signal ev;
std::vector<Gcd_Signal>::iterator it1;
for (it1 = signals_.begin(); it1 != signals_.end(); it1 ++) {
if (it1->sig == sig) {
it1->signal_fn_.push_back(signal_fn);
return 0;
}
}
sigaddset(&signal_masks_, sig);
FD_CLR(signal_fd_, &allfd_);
signal_fd_ = signalfd(signal_fd_, &signal_masks_, 0);
if (signal_fd_ < 0) {
return -1;
}
sigprocmask(SIG_BLOCK, &signal_masks_, nullptr);
FD_SET(signal_fd_, &allfd_);
ev.sig = sig;
ev.signal_fn_.push_back(signal_fn);
signals_.push_back(ev);
return 0;
}
void Gcd::Run_Execution(Job_Fn job) noexcept
{
p_.Queue_Work(job);
}
/***
*
*
*
* T E S T C O D E
*
*
*/
void execution_point()
{
printf("An execution point\n");
}
void timer_function()
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
printf("%ld.%ld\n", ts.tv_sec, ts.tv_nsec);
Gcd::Instance()->Run_Execution(&execution_point);
}
void socket_function(int fd)
{
char data[3000];
fgets(data, sizeof(data), stdin);
printf("data: %s\n", data);
}
void signal_function(int sig)
{
printf("signal %d occured\n", sig);
Gcd::Instance()->Terminate();
}
int main()
{
auto fn = std::bind(&timer_function);
auto sock_fn = std::bind(&socket_function, std::placeholders::_1);
auto sig_fn = std::bind(&signal_function, std::placeholders::_1);
// register timer
Gcd::Instance()->Create_Timer_Event(1, 0, fn);
// register fd event for 0 (aka stdin)
Gcd::Instance()->Create_Socket_Event(0, sock_fn);
// resiter for SIGINT
Gcd::Instance()->Create_Signal_Event(SIGINT, sig_fn);
// start the gcd
Gcd::Instance()->Start();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment