Skip to content

Instantly share code, notes, and snippets.

@nilspin
Created October 8, 2019 17:18
Show Gist options
  • Save nilspin/1aae3c3a83110a8dfd225af934b84160 to your computer and use it in GitHub Desktop.
Save nilspin/1aae3c3a83110a8dfd225af934b84160 to your computer and use it in GitHub Desktop.
//g++ -DBUILD_WITH_EASY_PROFILER -g jobSystem.cpp -pthread -leasy_profiler
#include<iostream>
#include<memory>
#include<vector>
#include<queue>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<chrono>
#include<random>
#include<functional>
#include<atomic>
#include<easy/profiler.h>
#include<easy/arbitrary_value.h>
using namespace std;
using funcptr = void (*) ();
/*
------------GLOBAL DATA-----------*/
//global list that'll store bunch of random numbers along with mutex to protect it
static vector<int> myNumbers; mutex myNumMutex;
//list to store add() and erase() functions
static vector<std::function<void(int)>> function_list;
//random number stuff
static random_device rd;
static mt19937 mt(rd());
static uniform_int_distribution<int> dist(1,100);
/*--------------------------------*/
void printList(const vector<int>& l) {
for(auto& i:l) {cout<<i<<"\t";}
cout<<"\nAnd size is : "<<l.size()<<"\n";
}
class JobSystem {
vector<thread> pool;
queue<funcptr> job_queue;
condition_variable cv; //to communicate between threads
mutex mu; //to synchronise access to job queue
void consumer() { //thread handler
//EASY_FUNCTION(profiler::colors::Green300);
//Lock over mutex. This can result in two outcomes:
//1. We get the lock and get exclusive access OR
//2. We put ourselves on list with other possible threads for same mutex
unique_lock<mutex> lock(mu);
//EASY_BLOCK("Main loop", profiler::colors::Green400);
do {
// cv.wait requires for us to have lock
cv.wait(lock, [this]{return (job_queue.size() || quit);}); //this overload of wait() prevents spurious wake-ups
//after wait we own the lock
while(!job_queue.empty()) {
//cout<<"Being processed by "<<this_thread::get_id()<<"\n";
auto func = std::move(job_queue.front());
job_queue.pop();
lock.unlock();
func(); //execute the job
//trying to lock again would put this thread back to jobhunting
lock.lock();
}
} while(!quit);//while(!quit && !job_queue.empty())
//EASY_END_BLOCK;
}
public:
int MAX_WORKER_THREADS = 0;
atomic<bool> quit;
JobSystem() {
EASY_FUNCTION(profiler::colors::Red200);
EASY_ARRAY("myNumbers",&myNumbers[0], myNumbers.size(), profiler::colors::Pink500);
quit = false;
MAX_WORKER_THREADS = thread::hardware_concurrency();//3;//
//Important : this cout essentially 'warms up' the ostream so
//successive couts work fine
cout<<"MAX_WORKER_THREADS available : "<<MAX_WORKER_THREADS-1<<"\n";
pool.resize(MAX_WORKER_THREADS -1);
//init threads
for(int i=0;i<pool.size();++i) {
pool[i] = thread(&JobSystem::consumer, this);
cout<<"Thread "<<i<<" init with ID "<<pool[i].get_id()<<"\n";
}
}
~JobSystem() {
EASY_FUNCTION(profiler::colors::Yellow);
quit = true;
cv.notify_all();
cout<<"Joining remaining threads \n";
for(auto& i: pool) {
//if(i.joinable())
{
cout<<"Destroying thread "<<i.get_id()<<"\n";
i.join();
}
}
printList(myNumbers);
cout<<"Exiting.. \n";
}
void dispatch(funcptr op) {
EASY_FUNCTION("Dispatch", profiler::colors::Orange200);
//attempt to get lock on job queue
EASY_BLOCK("wait", profiler::colors::Orange400);
unique_lock<mutex> lock(mu);
EASY_END_BLOCK;
EASY_BLOCK("push job", profiler::colors::Orange400);
job_queue.push(op);
lock.unlock();
//unlock the queue and notify all threads there's new work to do
cv.notify_all();
}
};
void add(int a) {
EASY_FUNCTION(profiler::colors::Cyan300);
//put lock before editing
EASY_BLOCK("wait", profiler::colors::Cyan700);
unique_lock<mutex> lock(myNumMutex);
EASY_END_BLOCK;
EASY_BLOCK("emplace_back", profiler::colors::Cyan700);
myNumbers.emplace_back(a);
EASY_END_BLOCK;
//EASY_BLOCK("cout", profiler::colors::Brown500);
//cout<<"Number "<<a<<" added to list\n";
//EASY_END_BLOCK;
}
void erase(int a) {
EASY_FUNCTION(profiler::colors::Yellow300);
//put lock before editing
EASY_BLOCK("wait", profiler::colors::Yellow700);
unique_lock<mutex> lock(myNumMutex);
EASY_END_BLOCK;
EASY_BLOCK("remove", profiler::colors::Yellow700);
for(auto i = myNumbers.begin(); i!= myNumbers.end();++i) {
if(*i == a) {
myNumbers.erase(i);
//cout<<"Number "<<a<<" removed\n";
return;
}
}
EASY_END_BLOCK;
//EASY_BLOCK("cout", profiler::colors::Brown500);
//cout<<"Could not find "<<a<<" in list\n";
//EASY_END_BLOCK;
}
void setup() {
EASY_FUNCTION(profiler::colors::Blue200);
random_device rd;
mt19937 mt(rd());
uniform_int_distribution<int> dist(1,100);
cout<<"\n Storing functions add() and erase() into list\n";
function_list.push_back(add);
function_list.push_back(erase);
}
int main() {
EASY_MAIN_THREAD;
EASY_PROFILER_ENABLE;
//profiler::startListen();
setup();
JobSystem system;
for(int i=0;i<100000;++i) {
system.dispatch([]{
int randomNumber = dist(mt);
//call add() or erase() randomly with random values
auto& f = function_list[randomNumber%2];
f(randomNumber);});
}
system.dispatch([]{add(98);});
system.dispatch([]{erase(98);});
profiler::dumpBlocksToFile("JobSystem.prof");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment