Skip to content

Instantly share code, notes, and snippets.

@simonlynen
Last active March 18, 2022 03:18
Show Gist options
  • Save simonlynen/efba97276c8b09502a20 to your computer and use it in GitHub Desktop.
Save simonlynen/efba97276c8b09502a20 to your computer and use it in GitHub Desktop.
#ifndef MULTIAGENT_MAPPING_COMMON_THREADSAFE_QUEUE_H_
#define MULTIAGENT_MAPPING_COMMON_THREADSAFE_QUEUE_H_
#include <atomic>
#include <memory>
#include <pthread.h>
#include <queue>
#include <string>
#include <sys/time.h>
#include <glog/logging.h>
#define MULTIAGENT_MAPPING_POINTER_TYPEDEFS(TypeName) \
typedef std::shared_ptr<TypeName> Ptr; \
typedef std::shared_ptr<const TypeName> ConstPtr; \
typedef std::unique_ptr<TypeName> UniquePtr; \
void definePointerTypedefs##__FILE__##__LINE__(void)
namespace common {
class ThreadSafeQueueBase {
public:
MULTIAGENT_MAPPING_POINTER_TYPEDEFS(ThreadSafeQueueBase);
ThreadSafeQueueBase() = default;
virtual ~ThreadSafeQueueBase() {}
virtual void NotifyAll() const = 0;
virtual void Shutdown() = 0;
virtual void Resume() = 0;
virtual size_t Size() const = 0;
virtual bool Empty() const = 0;
};
template <typename QueueType>
class ThreadSafeQueue : public ThreadSafeQueueBase {
friend bool test_funcs(void* (*)(void*), void* (*)(void*), // NOLINT
const std::string&, bool);
public:
MULTIAGENT_MAPPING_POINTER_TYPEDEFS(ThreadSafeQueue);
virtual void NotifyAll() const final {
pthread_cond_broadcast(&condition_empty_);
pthread_cond_broadcast(&condition_full_);
}
ThreadSafeQueue() {
shutdown_ = false;
pthread_mutex_init(&mutex_, NULL);
pthread_cond_init(&condition_empty_, NULL);
pthread_cond_init(&condition_full_, NULL);
}
virtual ~ThreadSafeQueue() {
shutdown_ = true;
NotifyAll();
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&condition_empty_);
pthread_cond_destroy(&condition_full_);
}
virtual void Shutdown() final {
shutdown_ = true;
NotifyAll();
}
virtual void Resume() final {
shutdown_ = false;
NotifyAll();
}
// Push to the queue.
void Push(const QueueType& value) { PushNonBlocking(value); }
// Push to the queue.
void PushNonBlocking(const QueueType& value) {
pthread_mutex_lock(&mutex_);
queue_.push(value);
pthread_cond_signal(&condition_empty_); // Signal that data is available.
pthread_mutex_unlock(&mutex_);
}
virtual size_t Size() const final {
pthread_mutex_lock(&mutex_);
size_t size = queue_.size();
pthread_mutex_unlock(&mutex_);
return size;
}
virtual bool Empty() const final {
pthread_mutex_lock(&mutex_);
bool empty = queue_.empty();
pthread_mutex_unlock(&mutex_);
return empty;
}
// Push to the queue if the size is less than max_queue_size, else block.
bool PushBlockingIfFull(const QueueType& value, size_t max_queue_size) {
while (!shutdown_) {
pthread_mutex_lock(&mutex_);
size_t size = queue_.size();
if (size >= max_queue_size) {
pthread_cond_wait(&condition_full_, &mutex_);
}
if (size >= max_queue_size) {
pthread_mutex_unlock(&mutex_);
continue;
}
queue_.push(value);
pthread_cond_signal(&condition_empty_); // Signal that data is available.
pthread_mutex_unlock(&mutex_);
return true;
}
return false;
}
// Returns true if oldest was dropped because queue was full.
bool PushNonBlockingDroppingIfFull(const QueueType& value,
size_t max_queue_size) {
pthread_mutex_lock(&mutex_);
bool result = false;
if (queue_.size() >= max_queue_size) {
queue_.pop();
result = true;
}
queue_.push(value);
pthread_cond_signal(&condition_empty_); // Signal that data is available.
pthread_mutex_unlock(&mutex_);
return result;
}
// Pops from the queue blocking if queue is empty.
bool Pop(QueueType* value) { return PopBlocking(value); }
// Pops from the queue blocking if queue is empty.
bool PopBlocking(QueueType* value) {
CHECK_NOTNULL(value);
while (!shutdown_) {
pthread_mutex_lock(&mutex_);
if (queue_.empty()) {
pthread_cond_wait(&condition_empty_, &mutex_);
}
if (queue_.empty()) {
pthread_mutex_unlock(&mutex_);
continue;
}
QueueType _value = queue_.front();
queue_.pop();
pthread_cond_signal(&condition_full_); // Notify that space is available.
pthread_mutex_unlock(&mutex_);
*value = _value;
return true;
}
return false;
}
// Check queue is empty, if yes return false, not altering value. If queue not
// empty update value and return true.
bool PopNonBlocking(QueueType* value) {
CHECK_NOTNULL(value);
pthread_mutex_lock(&mutex_);
if (queue_.empty()) {
pthread_mutex_unlock(&mutex_);
return false;
}
*value = queue_.front();
queue_.pop();
pthread_mutex_unlock(&mutex_);
return true;
}
// Check queue is empty, if yes return false, not altering value. If queue not
// empty update value and return true.
bool PopTimeout(QueueType* value, int64_t timeout_nanoseconds) {
CHECK_NOTNULL(value);
pthread_mutex_lock(&mutex_);
if (queue_.empty()) {
struct timeval tv;
struct timespec ts;
gettimeofday(&tv, NULL);
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1e3 + timeout_nanoseconds;
pthread_cond_timedwait(&condition_empty_, &mutex_, &ts);
}
if (queue_.empty()) {
pthread_mutex_unlock(&mutex_);
return false;
}
QueueType _value = queue_.front();
queue_.pop();
pthread_cond_signal(&condition_full_); // Notify that space is available.
pthread_mutex_unlock(&mutex_);
*value = _value;
return true;
}
// Get a copy of the front element of the queue. Returns false if empty.
bool getCopyOfFront(QueueType* value) {
CHECK_NOTNULL(value);
pthread_mutex_lock(&mutex_);
if (queue_.empty()) {
pthread_mutex_unlock(&mutex_);
return false;
}
// COPY the value.
*value = queue_.front();
pthread_mutex_unlock(&mutex_);
return true;
}
// Get a copy of the front element of the queue. Returns false if the queue is shut down.
bool getCopyOfFrontBlocking(QueueType* value) {
CHECK_NOTNULL(value);
while (!shutdown_) {
pthread_mutex_lock(&mutex_);
if (queue_.empty()) {
pthread_cond_wait(&condition_empty_, &mutex_);
}
if (queue_.empty()) {
pthread_mutex_unlock(&mutex_);
continue;
}
*value = queue_.front();
pthread_mutex_unlock(&mutex_);
return true;
}
return false;
}
// Get a copy of the back element of the queue. Returns false if empty.
bool getCopyOfBack(QueueType* value) {
CHECK_NOTNULL(value);
pthread_mutex_lock(&mutex_);
if (queue_.empty()) {
pthread_mutex_unlock(&mutex_);
return false;
}
// COPY the value.
*value = queue_.back();
pthread_mutex_unlock(&mutex_);
return true;
}
mutable pthread_mutex_t mutex_;
mutable pthread_cond_t condition_empty_;
mutable pthread_cond_t condition_full_;
std::queue<QueueType> queue_;
std::atomic_bool shutdown_;
};
} // namespace common
#endif // MULTIAGENT_MAPPING_COMMON_THREADSAFE_QUEUE_H_
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment