Skip to content

Instantly share code, notes, and snippets.

@jac18281828
Created February 29, 2024 17:39
Show Gist options
  • Save jac18281828/c8b7af2af918d02b755aea699253633c to your computer and use it in GitHub Desktop.
Save jac18281828/c8b7af2af918d02b755aea699253633c to your computer and use it in GitHub Desktop.
single producer single consumer fifo
#pragma once
#include <cstdlib>
#include <memory>
#include <atomic>
#include <new>
template <typename T, typename Alloc = std::allocator<T>>
class spsc : private Alloc
{
public:
using allocator_traits = std::allocator_traits<Alloc>;
using value_type = T;
using size_type = typename allocator_traits::size_type;
private:
static constexpr auto cache_alignment = size_type{64};
const size_type capacity;
alignas(cache_alignment) std::atomic<size_type> head;
alignas(cache_alignment) std::atomic<size_type> tail;
T *ring_buffer;
char padding[cache_alignment - sizeof(T *)];
public:
explicit spsc(size_type capacity, Alloc const &alloc = Alloc{})
: Alloc{alloc}, capacity{capacity}, head{0}, tail{0}, ring_buffer{allocator_traits::allocate(*this, capacity)}
{
}
spsc() = default;
spsc(const spsc &) = delete;
spsc(spsc &&) = delete;
spsc &operator=(const spsc &) = delete;
spsc &operator=(spsc &&) = delete;
~spsc()
{
while (not empty())
{
T value;
pop(value);
}
allocator_traits::deallocate(*this, ring_buffer, capacity);
}
auto push(T value)
{
if (size() == max_size())
{
return false;
}
auto const pushCursor = tail.load(std::memory_order_relaxed) % capacity;
new (&ring_buffer[pushCursor]) T(value);
tail.fetch_add(1, std::memory_order_release);
return true;
}
bool pop(T &value)
{
if (empty())
{
return false;
}
auto const popCursor = head.load(std::memory_order_relaxed) % capacity;
value = std::move(ring_buffer[popCursor]);
ring_buffer[popCursor].~T();
head.fetch_add(1, std::memory_order_release);
return true;
}
// Returns the number of elements in the queue.
auto size() const noexcept
{
return tail.load(std::memory_order_relaxed) - head.load(std::memory_order_relaxed);
}
// Returns true if empty
auto empty() const noexcept
{
return size() == 0;
}
// Returns the capacity of the queue.
auto max_size() const noexcept
{
return capacity;
}
};
#include <thread>
#include "gtest/gtest.h"
#include "spsc.h"
TEST(spscTest, TestPush)
{
using size_type = std::size_t;
spsc<size_type> spsc(10);
ASSERT_TRUE(spsc.push(1));
ASSERT_TRUE(spsc.push(2));
ASSERT_TRUE(spsc.push(3));
ASSERT_TRUE(spsc.push(4));
ASSERT_TRUE(spsc.push(5));
ASSERT_TRUE(spsc.push(6));
ASSERT_TRUE(spsc.push(7));
ASSERT_TRUE(spsc.push(8));
ASSERT_TRUE(spsc.push(9));
ASSERT_TRUE(spsc.push(10));
ASSERT_FALSE(spsc.push(11));
}
TEST(spscTest, TestSize)
{
using size_type = std::size_t;
spsc<size_type> spsc(10);
ASSERT_EQ(spsc.size(), 0);
spsc.push(1);
ASSERT_EQ(spsc.size(), 1);
spsc.push(2);
ASSERT_EQ(spsc.size(), 2);
spsc.push(3);
ASSERT_EQ(spsc.size(), 3);
spsc.push(4);
ASSERT_EQ(spsc.size(), 4);
spsc.push(5);
ASSERT_EQ(spsc.size(), 5);
spsc.push(6);
ASSERT_EQ(spsc.size(), 6);
spsc.push(7);
ASSERT_EQ(spsc.size(), 7);
spsc.push(8);
ASSERT_EQ(spsc.size(), 8);
spsc.push(9);
ASSERT_EQ(spsc.size(), 9);
spsc.push(10);
ASSERT_EQ(spsc.size(), 10);
spsc.push(11);
ASSERT_EQ(spsc.size(), 10);
}
TEST(spscTest, PopTest) {
using size_type = std::size_t;
spsc<size_type> spsc(10);
spsc.push(1);
spsc.push(2);
spsc.push(3);
spsc.push(4);
spsc.push(5);
spsc.push(6);
spsc.push(7);
spsc.push(8);
spsc.push(9);
spsc.push(10);
size_type value;
ASSERT_TRUE(spsc.pop(value));
ASSERT_EQ(value, 1);
ASSERT_TRUE(spsc.pop(value));
ASSERT_EQ(value, 2);
ASSERT_TRUE(spsc.pop(value));
ASSERT_EQ(value, 3);
ASSERT_TRUE(spsc.pop(value));
ASSERT_EQ(value, 4);
ASSERT_TRUE(spsc.pop(value));
ASSERT_EQ(value, 5);
ASSERT_TRUE(spsc.pop(value));
ASSERT_EQ(value, 6);
ASSERT_TRUE(spsc.pop(value));
ASSERT_EQ(value, 7);
ASSERT_TRUE(spsc.pop(value));
ASSERT_EQ(value, 8);
ASSERT_TRUE(spsc.pop(value));
ASSERT_EQ(value, 9);
ASSERT_TRUE(spsc.pop(value));
ASSERT_EQ(value, 10);
ASSERT_FALSE(spsc.pop(value));
}
TEST(spscTest, EmptyTest) {
using size_type = std::size_t;
spsc<size_type> spsc(10);
ASSERT_TRUE(spsc.empty());
spsc.push(1UL);
ASSERT_FALSE(spsc.empty());
auto value = 0UL;
spsc.pop(value);
ASSERT_TRUE(spsc.empty());
}
TEST(spscTest, MaxSizeTest) {
using size_type = std::size_t;
spsc<size_type> spsc(10);
ASSERT_EQ(spsc.max_size(), 10);
}
TEST(spscTest, MultiThreadTest) {
using size_type = std::size_t;
spsc<size_type> spsc(10);
std::thread t1([&spsc]() {
for (size_type i = 0; i < 100000; ++i) {
while(!spsc.push(i)) {
std::this_thread::yield();
}
}
});
std::thread t2([&spsc]() {
for (size_type i = 0; i < 100000; ++i) {
size_type value;
while(!spsc.pop(value)) {
std::this_thread::yield();
}
ASSERT_EQ(value, i);
}
});
t1.join();
t2.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment