Skip to content

Instantly share code, notes, and snippets.

@devendranaga
Last active December 18, 2020 19:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devendranaga/5b23ca2bc3abe3797003c107b0fb8d93 to your computer and use it in GitHub Desktop.
Save devendranaga/5b23ca2bc3abe3797003c107b0fb8d93 to your computer and use it in GitHub Desktop.
#include <iostream>
#include <memory>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
template <class T>
class Message_Queue {
public:
~Message_Queue() {
sub_cb_.empty();
msgs_.empty();
}
static Message_Queue<T> *Instance() {
static Message_Queue<T> t;
return &t;
}
explicit Message_Queue(const Message_Queue &) = delete;
const Message_Queue operator=(const Message_Queue *) = delete;
void Subscribe(std::function<void(T)> &sub_callback) {
sub_cb_.emplace_back(sub_callback);
}
void Push(T &msg) {
std::unique_lock<std::mutex> lock(lock_);
msgs_.push(msg);
cond_.notify_all();
}
private:
std::queue<T> msgs_;
std::vector<std::function<void(T)>> sub_cb_;
std::unique_ptr<std::thread> thr_handle_;
std::mutex lock_;
std::condition_variable cond_;
void Caller_Thread() {
while (1) {
{
std::unique_lock<std::mutex> lock(lock_);
cond_.wait(lock);
}
int q_size = msgs_.size();
while (q_size > 0) {
auto elem = msgs_.front();
for (auto sub : sub_cb_) {
sub(elem);
}
msgs_.pop();
q_size = msgs_.size();
}
}
}
explicit Message_Queue() {
thr_handle_ = std::make_unique<std::thread>(&Message_Queue::Caller_Thread, this);
}
};
/// T E S T C O D E
void Publisher()
{
Message_Queue<int> *q = Message_Queue<int>::Instance();
while (1) {
static int var = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
printf("publish -> %d\n", var);
q->Push(var);
var ++;
}
}
class Subscriber {
public:
explicit Subscriber(const std::string &name): name_(name) { }
void Subscribe() {
Message_Queue<int> *q = Message_Queue<int>::Instance();
std::function<void(int)> callback = std::bind(&Subscriber::Subscribe_Callback, this, std::placeholders::_1);
q->Subscribe(callback);
}
private:
void Subscribe_Callback(int val) {
printf("[%s] received callback val -> %d\n", name_.c_str(), val);
}
std::string name_;
};
int main()
{
Message_Queue<int> *q = Message_Queue<int>::Instance();
Subscriber sub1("sub1");
Subscriber sub2("sub2");
sub1.Subscribe();
sub2.Subscribe();
std::thread pub(Publisher);
pub.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment