Skip to content

Instantly share code, notes, and snippets.

@tarqd
Last active August 29, 2015 13:57
Show Gist options
  • Save tarqd/9560518 to your computer and use it in GitHub Desktop.
Save tarqd/9560518 to your computer and use it in GitHub Desktop.
Simple Standalone Single Producer Single Consumer Queue (Based on Facebook/Folly)
// based on facebook/folly/ProducerConsumerQueue.h
// main difference is the inclusion of a pop_all() method
// and more stl-ish method names
// and it uses a vector for the underlying storage (which let's us leave out lots of manual cleanup/memory management)
// this is fine because we never resize the vector
// also the capacity parameter is the usable capacity, this class will automatically allocate one more for the dummy value
// original copyright:
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// @author Bo Hu (bhu@fb.com)
// @author Jordan DeLong (delong.j@fb.com)
#ifndef SPSC_QUEUE_H_
#define SPSC_QUEUE_H_
#include <vector>
#include <cstdlib>
template <class T>
class spsc_queue {
public:
using size_type = std::size_t;
using difference_type = std::ptrdiff_t;
using value_type = typename std::decay<T>::type;
using pointer = typename std::add_pointer<value_type>::type;
using const_pointer = typename std::add_const<pointer>::type;
using reference = typename std::add_lvalue_reference<value_type>::type;
using const_reference = typename std::add_const<reference>::type;
spsc_queue(size_type capacity = 16) :
m_write_index(0),
m_read_index(0),
m_capacity(capacity+1), // +1 capacity for dummy value that indicates "empty"
m_buffer(m_capacity)
{
};
bool try_push(const_reference value) {
auto const write_index = m_write_index.load(std::memory_order_relaxed);
auto next_index = write_index + 1;
if (next_index == m_capacity) {
next_index = 0;
}
if (next_index != m_read_index.load(std::memory_order_acquire)) {
m_buffer[write_index] = value;
m_write_index.store(next_index, std::memory_order_release);
return true;
}
// queue is full
return false;
}
// same as above but takes an R-value reference (doesn't work with lambdas :( )
bool try_push(value_type&& value) {
auto const write_index = m_write_index.load(std::memory_order_relaxed);
auto next_index = write_index + 1;
if (next_index == m_capacity) {
next_index = 0;
}
if (next_index != m_read_index.load(std::memory_order_acquire)) {
m_buffer[write_index] = std::move(value);
m_write_index.store(next_index, std::memory_order_release);
return true;
}
// queue is full
return false;
}
// try and pop an element off the queue
// uses 2 atomic reads and 1 atomic write
// generally you'll want to use pop_all() instead
bool try_pop(reference value) {
auto const read_index = m_read_index.load(std::memory_order_relaxed);
if (read_index == m_write_index.load(std::memory_order_acquire)) {
// queue is empty
return false;
}
auto next_index = read_index + 1;
if (next_index == m_capacity) {
next_index = 0;
}
value = std::move(m_buffer[read_index]);
m_read_index.store(next_index, std::memory_order_release);
return true;
}
// use this for message queues and the like
// copies the current elements into a vector and returns it
// only uses 2 atomic reads and 1 atomic write for N elements
// whereas using try_pop() in a loop you'd use 2N atomic reads and N atomic writes
// plus we it's faster to move all the elements out of the underlying buffer since
// they'll likely be in cache already. should be Fast Enough (TM) for most message queues
std::vector<value_type> pop_all() {
std::vector<value_type> value;
using std::make_move_iterator;
auto const read_index = m_read_index.load(std::memory_order_relaxed);
auto const write_index = m_write_index.load(std::memory_order_acquire);
if (read_index == write_index) {
// queue is empty
return value;
}
const bool is_wrapped = write_index < read_index;
auto const buffer_begin = m_buffer.begin();
auto const read_begin = buffer_begin + read_index;
auto const read_end = buffer_begin + write_index;
// fast case, linear copy
if (LIKELY(!is_wrapped)) {
assert(read_begin < read_end);
assert(read_end <= m_buffer.end());
value.insert(value.begin(), make_move_iterator(read_begin), make_move_iterator(read_end));
} else {
// slow case, copy to end and then wrap around
// technically we could check if the wrap is false (write_index == 0)
// but additional branching would slow down the fast path
auto const buffer_end = m_buffer.end();
size_type initial_size = std::distance(read_begin, buffer_end);
size_type total_size = initial_size + std::distance(buffer_begin, read_end);
value.reserve(total_size);
assert(read_begin < buffer_end);
value.insert(value.begin(), make_move_iterator(read_begin), make_move_iterator(buffer_end));
assert(buffer_begin <= read_end);
assert(read_end <= buffer_end);
value.insert(value.end(), make_move_iterator(buffer_begin), make_move_iterator(read_end));
assert(value.size() == total_size);
}
// we've emptied the queue (as far as we know)
m_read_index.store(write_index, std::memory_order_release);
return value;
}
bool is_empty() const {
return m_write_index.load(std::memory_order_consume) == m_read_index.load(std::memory_order_consume);
}
bool is_full() const {
auto next_index = m_write_index.load(std::memory_order_consume) + 1;
if (next_index == m_capacity) {
next_index = 0;
}
if (next_index != m_read_index.load(std::memory_order_consume)) {
return false;
}
return true;
}
// obviously this is an estimate if you're concurrently writing to the queue
// can be stupidly inaccurate if you're using it from a producer
size_type size() const {
difference_type ret = m_write_index.load(std::memory_order_consume) - m_read_index.load(std::memory_order_consume);
return ret < 0 ? ret + m_capacity : ret;
};
// true capacity of the queue is size()-1 because a dummy element is required to check for emptiness
size_type capacity() const {
return m_buffer.size() - 1;
};
// queue is a fixed size, just return the capacity
size_type max_size() const {
return capacity();
}
private:
std::atomic<size_t> m_write_index, m_read_index, m_size;
const size_t m_capacity;
std::vector<value_type> m_buffer;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment