Skip to content

Instantly share code, notes, and snippets.

@stephanlachnit
Last active January 19, 2023 17:34
Show Gist options
  • Save stephanlachnit/26cc3eee2a035ac930b8d21ec20a0fa1 to your computer and use it in GitHub Desktop.
Save stephanlachnit/26cc3eee2a035ac930b8d21ec20a0fa1 to your computer and use it in GitHub Desktop.
C++17 blocking queue with timeout and shutdown request
// SPDX-License-Identifier: Unlicense
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <deque>
#include <mutex>
#include <optional>
#include <queue>
template <class T, class Container = std::deque<T>>
class blocking_queue {
public:
void push(T const& value) {
std::unique_lock<std::mutex> lock(queue_mutex_);
queue_.push(value);
lock.unlock();
take_condition_.notify_one();
}
template<class... Args>
void emplace(Args&&... args) {
std::unique_lock<std::mutex> lock(queue_mutex_);
queue_.emplace(std::forward<Args>(args)...);
lock.unlock();
take_condition_.notify_one();
}
std::optional<T> take() {
std::unique_lock<std::mutex> lock(queue_mutex_);
take_condition_.wait(lock, [this](){ return terminated_ || !queue_.empty(); });
if (terminated_) {
return std::nullopt;
}
T front {std::move(queue_.front())};
queue_.pop();
return front;
}
template<class Duration>
std::optional<T> take(const Duration& rel_time) {
std::unique_lock<std::mutex> lock(queue_mutex_);
auto predicate_status = take_condition_.wait_for(lock, rel_time, [this](){ return terminated_ || !queue_.empty(); });
if (terminated_ || !predicate_status) {
return std::nullopt;
}
T front {std::move(queue_.front())};
queue_.pop();
return front;
}
typename std::queue<T, Container>::size_type size() {
std::scoped_lock<std::mutex> lock(queue_mutex_);
return queue_.size();
}
bool empty() {
std::scoped_lock<std::mutex> lock(queue_mutex_);
return queue_.empty();
}
void clear() {
std::queue<T, Container>().swap(queue_);
}
void terminate() {
terminated_ = true;
take_condition_.notify_all();
}
void resume() {
terminated_ = false;
}
bool is_terminated() {
return terminated_;
}
private:
std::atomic_bool terminated_ {false};
std::mutex queue_mutex_;
std::condition_variable take_condition_;
std::queue<T, Container> queue_;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment