Last active
January 19, 2023 17:34
-
-
Save stephanlachnit/26cc3eee2a035ac930b8d21ec20a0fa1 to your computer and use it in GitHub Desktop.
C++17 blocking queue with timeout and shutdown request
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// SPDX-License-Identifier: Unlicense | |
#include <atomic> | |
#include <chrono> | |
#include <condition_variable> | |
#include <deque> | |
#include <mutex> | |
#include <optional> | |
#include <queue> | |
template <class T, class Container = std::deque<T>> | |
class blocking_queue { | |
public: | |
void push(T const& value) { | |
std::unique_lock<std::mutex> lock(queue_mutex_); | |
queue_.push(value); | |
lock.unlock(); | |
take_condition_.notify_one(); | |
} | |
template<class... Args> | |
void emplace(Args&&... args) { | |
std::unique_lock<std::mutex> lock(queue_mutex_); | |
queue_.emplace(std::forward<Args>(args)...); | |
lock.unlock(); | |
take_condition_.notify_one(); | |
} | |
std::optional<T> take() { | |
std::unique_lock<std::mutex> lock(queue_mutex_); | |
take_condition_.wait(lock, [this](){ return terminated_ || !queue_.empty(); }); | |
if (terminated_) { | |
return std::nullopt; | |
} | |
T front {std::move(queue_.front())}; | |
queue_.pop(); | |
return front; | |
} | |
template<class Duration> | |
std::optional<T> take(const Duration& rel_time) { | |
std::unique_lock<std::mutex> lock(queue_mutex_); | |
auto predicate_status = take_condition_.wait_for(lock, rel_time, [this](){ return terminated_ || !queue_.empty(); }); | |
if (terminated_ || !predicate_status) { | |
return std::nullopt; | |
} | |
T front {std::move(queue_.front())}; | |
queue_.pop(); | |
return front; | |
} | |
typename std::queue<T, Container>::size_type size() { | |
std::scoped_lock<std::mutex> lock(queue_mutex_); | |
return queue_.size(); | |
} | |
bool empty() { | |
std::scoped_lock<std::mutex> lock(queue_mutex_); | |
return queue_.empty(); | |
} | |
void clear() { | |
std::queue<T, Container>().swap(queue_); | |
} | |
void terminate() { | |
terminated_ = true; | |
take_condition_.notify_all(); | |
} | |
void resume() { | |
terminated_ = false; | |
} | |
bool is_terminated() { | |
return terminated_; | |
} | |
private: | |
std::atomic_bool terminated_ {false}; | |
std::mutex queue_mutex_; | |
std::condition_variable take_condition_; | |
std::queue<T, Container> queue_; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment