Skip to content

Instantly share code, notes, and snippets.

@Richard-W
Created July 23, 2015 17:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Richard-W/cfd3b8875976c371df01 to your computer and use it in GitHub Desktop.
Save Richard-W/cfd3b8875976c371df01 to your computer and use it in GitHub Desktop.
A thread safe queue that blocks pop when no elements are available
/* Copyright 2015, Richard Wiedenhöft <richard@wiedenhoeft.xyz>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include<queue>
#include<mutex>
#include<condition_variable>
#include<utility>
#include<stdexcept>
#include<type_traits>
namespace alpha {
/**
* @brief Thrown when calling modifying methods on AsyncQueue after it has been closed
*/
class AsyncQueueClosedError : public std::runtime_error {
public:
AsyncQueueClosedError() : std::runtime_error("The AsyncQueue has been closed.") {}
};
/**
* @brief A queue that may be accessed from different threads
*/
template<typename T>
class AsyncQueue {
private:
std::queue<T> queue;
std::mutex mutex;
std::condition_variable cond;
bool _closed = false;
public:
/* Big 5 */
AsyncQueue(const AsyncQueue& other) = delete;
AsyncQueue(AsyncQueue&& other) = delete;
AsyncQueue& operator=(const AsyncQueue& other) = delete;
AsyncQueue& operator=(AsyncQueue&& other) = delete;
virtual ~AsyncQueue() = default;
/**
* @brief Default constructor
*/
AsyncQueue() {
}
/**
* @brief Push a value to the async queue
*/
template<typename U>
void push(U&& object) {
static_assert(
std::is_convertible<U, T>::value,
"Argument is push is incompatible to template type of AsyncQueue."
);
{
std::lock_guard<std::mutex> lock(this->mutex);
if(this->_closed) {
throw AsyncQueueClosedError();
}
this->queue.emplace(std::forward<U>(object));
}
this->cond.notify_one();
}
/**
* @brief Pop a value from the queue
*/
T pop() {
std::unique_lock<std::mutex> lock(this->mutex);
while(this->queue.empty() && !this->_closed) {
this->cond.wait(lock);
}
if(this->queue.empty()) {
/* If the queue is empty at this point it was closed. */
throw AsyncQueueClosedError();
}
T object = std::move(this->queue.front());
this->queue.pop();
return object;
}
/**
* @brief Returns if the queue is empty
*/
bool empty() {
std::lock_guard<std::mutex> lock(this->mutex);
return this->queue.empty();
}
/**
* @brief Close the queue while allowing to pop already inserted elements
*/
void close() {
{
std::lock_guard<std::mutex> lock(this->mutex);
this->_closed = true;
}
this->cond.notify_all();
}
/**
* @brief Returns whether the queue is closed
*/
bool closed() {
std::lock_guard<std::mutex> lock(this->mutex);
return this->_closed;
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment