Skip to content

Instantly share code, notes, and snippets.

@devendranaga
Created November 4, 2021 12:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devendranaga/7e0f129e810a310e86ea4e4a9d5dc2c6 to your computer and use it in GitHub Desktop.
Save devendranaga/7e0f129e810a310e86ea4e4a9d5dc2c6 to your computer and use it in GitHub Desktop.
safe_queue
/**
* @brief - implement Safe Queue technique for passing messages between various threads / classes
*
* @copyright - 2021-present All rights reserved
*
* @author - Devendra Naga (devendra.aaru@outlook.com)
*
* @license - proprietary license, ask author for more information
*/
#ifndef __SAFE_QUEUE_H__
#define __SAFE_QUEUE_H__
#include <iostream>
#include <queue>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
namespace Auto_OS::Lib {
/**
* @brief - internal information regarding subscriber
*/
template <typename T>
struct Subscriber_Info {
// id of the subscriber - for informational purposes
int sub_id;
// receive callback to bve called by the notifier thread
std::function<void(int, T)> receive_callback;
};
/**
* @brief - implements SafeQueue singleton class
*/
template <typename T>
class SafeQueue {
public:
~SafeQueue() { }
explicit SafeQueue(const SafeQueue &) = delete;
const SafeQueue &operator=(const SafeQueue &) = delete;
explicit SafeQueue(const SafeQueue &&) = delete;
const SafeQueue &&operator=(const SafeQueue &&) = delete;
/**
* @brief - get instance of SafeQueue object
*
* @return pointer to statically create safe queue class
*/
static SafeQueue *Instance()
{
static SafeQueue q;
return &q;
}
/**
* @brief - Queue item into the Safe queue
*/
void QueueItem(T &item)
{
std::unique_lock<std::mutex> lock(lock_);
items_.emplace(item);
cond_.notify_one();
}
/**
* @brief - subscribe for the publisher data
*
* @param in sub_id - subscriber id
* @param in rx_cb - callback to be called when the data is available
*/
void Subscribe(int sub_id, std::function<void(int, T)> rx_cb)
{
Subscriber_Info<T> cb;
cb.sub_id = sub_id;
cb.receive_callback = rx_cb;
std::unique_lock<std::mutex> lock(lock_);
subscribers_.emplace_back(cb);
}
/**
* @brief - unsubscribe from the publisher
*
* @param in id - id of the subscriber
*/
int Unsubscribe(int id)
{
typename std::vector<Subscriber_Info<T>>::iterator it;
for (it = subscribers_.begin(); it != subscribers_.end(); it ++) {
if (it->sub_id == id) {
break;
}
}
if (it != subscribers_.end()) {
subscribers_.erase(it);
return 0;
}
return -1;
}
private:
/**
* @brief - constructor
*/
explicit SafeQueue()
{
// create and run dispatcher thread that will wait on the queue and call subscribers
dispatch_thr_ = std::make_unique<std::thread>(&SafeQueue::Dispatcher, this);
dispatch_thr_->detach();
}
std::mutex lock_;
std::condition_variable cond_;
std::queue<T> items_;
std::unique_ptr<std::thread> dispatch_thr_;
std::vector<Subscriber_Info<T>> subscribers_;
/**
* @brief - implements dispatcher
*/
void Dispatcher()
{
while (1) {
{
std::unique_lock<std::mutex> lock(lock_);
cond_.wait(lock);
int len = items_.size();
while (len > 0) {
T item = items_.front();
for (auto it : subscribers_) {
it.receive_callback(it.sub_id, item);
}
items_.pop();
len --;
}
}
}
}
};
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment