Skip to content

Instantly share code, notes, and snippets.

@shawnfeng0
Last active March 22, 2022 04:09
Show Gist options
  • Save shawnfeng0/429b6caef85ec0d32f34081374dcdb8a to your computer and use it in GitHub Desktop.
Save shawnfeng0/429b6caef85ec0d32f34081374dcdb8a to your computer and use it in GitHub Desktop.
simple queue with mutex
//
// Created by shawnfeng on 2021-08-12.
// https://gist.github.com/ShawnFeng0/429b6caef85ec0d32f34081374dcdb8a
//
#pragma once
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <queue>
/**
* Multi-threaded queue, use mutex to protect the queue
* @tparam T Type
* @tparam max_size Maximum queue length
*
* Example:
* @code{.cpp}
auto int_queue = std::make_shared<MutexQueue<int, 100>>();
// Producer
std::thread{[=]() {
for (int i = 0; i < 1024; i++) {
int_queue->emplace_overwrite_oldest(i);
}
int_queue->alert_for_exit();
}}.join();
// Consumer
std::thread{[=]() {
while (true) {
int value;
// If the waiting fails, it means that the queue has been marked for
// cancellation, and it should be disconnected and exited from
// consumption
if (!int_queue->pop_wait_if_empty(&value)) {
std::cout << "The queue has ended." << std::endl;
break;
}
// Otherwise, the data has been fetched from the queue
std::cout << value << std::endl;
}
}}.detach();
* @endcode
*
*/
template <typename T, size_t max_size>
class MutexQueue {
static_assert(max_size > 0, "max_size must be > 0");
public:
using value_type = T;
auto size() {
std::unique_lock<std::mutex> lock(mutex_);
return queue_.size();
};
auto empty() {
std::unique_lock<std::mutex> lock(mutex_);
return queue_.size();
};
template <typename Type>
void push_overwrite_oldest(Type &&value) {
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.size() >= max_size) {
queue_.pop();
}
queue_.push(std::forward<Type>(value));
notifier_.notify_all();
}
template <typename... Args>
void emplace_overwrite_oldest(Args &&...args) {
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.size() >= max_size) {
queue_.pop();
}
queue_.emplace(std::forward<Args>(args)...);
notifier_.notify_all();
}
bool pop_if_not_empty(T *value) {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty()) {
return false;
}
if (value) {
*value = queue_.front();
}
queue_.pop();
return true;
}
/**
* Wait until there is data or an alert
* @param value Data pointer to be filled in the queue
* @return true Wait for the end, data is available
* @return false The queue publisher has issued an alert and may need to end
*/
bool pop_wait_if_empty(T *value) {
std::unique_lock<std::mutex> lock(mutex_);
/// If the queue is empty, wait until the queue is not empty
notifier_.wait(lock, [this] { return !queue_.empty() || alert_for_exit_; });
if (!queue_.empty()) {
if (value) {
*value = std::move(queue_.front());
}
queue_.pop();
return true;
} else {
/// For alert_for_exit_
return false;
}
}
bool is_alert() { return alert_for_exit_; }
/**
* Use condition variables to notify threads blocked by the queue that the
* queue has ended
*/
void alert_for_exit() {
std::unique_lock<std::mutex> lock(mutex_);
alert_for_exit_ = true;
notifier_.notify_all();
}
private:
std::queue<T> queue_;
std::mutex mutex_;
/**
* @see alert_for_exit
*/
std::atomic<bool> alert_for_exit_{false};
/**
* When data is written, the queue is not empty and a notification is sent
*/
std::condition_variable notifier_;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment