Last active
March 22, 2022 04:09
-
-
Save shawnfeng0/429b6caef85ec0d32f34081374dcdb8a to your computer and use it in GitHub Desktop.
simple queue with mutex
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Created by shawnfeng on 2021-08-12. | |
// https://gist.github.com/ShawnFeng0/429b6caef85ec0d32f34081374dcdb8a | |
// | |
#pragma once | |
#include <atomic> | |
#include <condition_variable> | |
#include <mutex> | |
#include <queue> | |
/** | |
* Multi-threaded queue, use mutex to protect the queue | |
* @tparam T Type | |
* @tparam max_size Maximum queue length | |
* | |
* Example: | |
* @code{.cpp} | |
auto int_queue = std::make_shared<MutexQueue<int, 100>>(); | |
// Producer | |
std::thread{[=]() { | |
for (int i = 0; i < 1024; i++) { | |
int_queue->emplace_overwrite_oldest(i); | |
} | |
int_queue->alert_for_exit(); | |
}}.join(); | |
// Consumer | |
std::thread{[=]() { | |
while (true) { | |
int value; | |
// If the waiting fails, it means that the queue has been marked for | |
// cancellation, and it should be disconnected and exited from | |
// consumption | |
if (!int_queue->pop_wait_if_empty(&value)) { | |
std::cout << "The queue has ended." << std::endl; | |
break; | |
} | |
// Otherwise, the data has been fetched from the queue | |
std::cout << value << std::endl; | |
} | |
}}.detach(); | |
* @endcode | |
* | |
*/ | |
template <typename T, size_t max_size> | |
class MutexQueue { | |
static_assert(max_size > 0, "max_size must be > 0"); | |
public: | |
using value_type = T; | |
auto size() { | |
std::unique_lock<std::mutex> lock(mutex_); | |
return queue_.size(); | |
}; | |
auto empty() { | |
std::unique_lock<std::mutex> lock(mutex_); | |
return queue_.size(); | |
}; | |
template <typename Type> | |
void push_overwrite_oldest(Type &&value) { | |
std::unique_lock<std::mutex> lock(mutex_); | |
while (queue_.size() >= max_size) { | |
queue_.pop(); | |
} | |
queue_.push(std::forward<Type>(value)); | |
notifier_.notify_all(); | |
} | |
template <typename... Args> | |
void emplace_overwrite_oldest(Args &&...args) { | |
std::unique_lock<std::mutex> lock(mutex_); | |
while (queue_.size() >= max_size) { | |
queue_.pop(); | |
} | |
queue_.emplace(std::forward<Args>(args)...); | |
notifier_.notify_all(); | |
} | |
bool pop_if_not_empty(T *value) { | |
std::unique_lock<std::mutex> lock(mutex_); | |
if (queue_.empty()) { | |
return false; | |
} | |
if (value) { | |
*value = queue_.front(); | |
} | |
queue_.pop(); | |
return true; | |
} | |
/** | |
* Wait until there is data or an alert | |
* @param value Data pointer to be filled in the queue | |
* @return true Wait for the end, data is available | |
* @return false The queue publisher has issued an alert and may need to end | |
*/ | |
bool pop_wait_if_empty(T *value) { | |
std::unique_lock<std::mutex> lock(mutex_); | |
/// If the queue is empty, wait until the queue is not empty | |
notifier_.wait(lock, [this] { return !queue_.empty() || alert_for_exit_; }); | |
if (!queue_.empty()) { | |
if (value) { | |
*value = std::move(queue_.front()); | |
} | |
queue_.pop(); | |
return true; | |
} else { | |
/// For alert_for_exit_ | |
return false; | |
} | |
} | |
bool is_alert() { return alert_for_exit_; } | |
/** | |
* Use condition variables to notify threads blocked by the queue that the | |
* queue has ended | |
*/ | |
void alert_for_exit() { | |
std::unique_lock<std::mutex> lock(mutex_); | |
alert_for_exit_ = true; | |
notifier_.notify_all(); | |
} | |
private: | |
std::queue<T> queue_; | |
std::mutex mutex_; | |
/** | |
* @see alert_for_exit | |
*/ | |
std::atomic<bool> alert_for_exit_{false}; | |
/** | |
* When data is written, the queue is not empty and a notification is sent | |
*/ | |
std::condition_variable notifier_; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment