Skip to content

Instantly share code, notes, and snippets.

@hotgloupi
Last active February 3, 2017 14:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hotgloupi/0e970a0e11091dfb0df8f942a0352b87 to your computer and use it in GitHub Desktop.
Save hotgloupi/0e970a0e11091dfb0df8f942a0352b87 to your computer and use it in GitHub Desktop.
basic ringbuffer draft for Boost.Lockfree
namespace lf = boost::lockfree;
struct packet_ringbuffer_policies : lf::default_ringbuffer_policies
{
typedef std::uin16_t buffer_size_t;
struct buffer
{
buffer_size_t size;
byte_t data;
};
};
struct packet_ringbuffer
: public lf::storage::heap<packet_ringbuffer_policies>
, public lf::basic_ringbuffer<packet_ringbuffer, packet_ringbuffer_policies>
{
typedef lf::storage::heap<packet_ringbuffer_policies> storage_t;
packet_ringbuffer(size_t size) : storage_t(size)
{}
};
#pragma once
#include <array>
#include <atomic>
#include <cstdint>
#include <type_traits>
#include <utility>
namespace boost { namespace lockfree {
#define BOOST_LOCKFREE_CACHELINE_BYTES 64
// Default policies used by basic_ringbuffer
struct default_ringbuffer_policies
{
// buffer's byte type
typedef std::uint8_t byte_t;
// type used to represent ring size and indices
typedef std::size_t ring_size_t;
// type used to represent a buffer size
typedef std::size_t buffer_size_t;
// buffer type
struct buffer
{
buffer_size_t size;
byte_t data[];
};
// a buffer_size > 0 means that we have fixed size buffers
static constexpr buffer_size_t buffer_size = 0;
};
// basic_ringbuffer encapsulate the logic about lockfree buffer
// manipulations.
template <typename Self, typename Policies>
struct basic_ringbuffer
{
static_assert(
std::is_base_of<default_ringbuffer_policies, Policies>::value,
"In order to be backward compatible when new policies are added");
public:
typedef typename Policies::byte_t byte_t;
typedef typename Policies::ring_size_t ring_size_t;
typedef typename Policies::buffer_size_t buffer_size_t;
typedef typename Policies::buffer buffer_t;
protected:
std::atomic<ring_size_t> _write_end;
alignas(BOOST_LOCKFREE_CACHELINE_BYTES)
std::atomic<ring_size_t> _read_end;
public:
basic_ringbuffer()
: _write_end(0)
, _read_end(0)
{}
public:
// Reserve a buffer in the ring and returns it. Returns nullptr if the
// ringbuffer is full.
buffer_t* start_write(buffer_size_t const size) noexcept;
// Commit size bytes into the ringbuffer.
// precondition: a buffer was previously returned by start_write()
// precondition: size <= previously asked size
void commit_write(buffer_size_t const size) noexcept;
// Commit buffer into the ringbuffer.
// precondition: buffer was previously returned by start_write()
// precondition: buffer->size <= previously asked size
void commit_write(buffer_t* buffer) noexcept;
// Returns the next buffer or nullptr if the ringbuffer is empty.
buffer_t const* start_read() noexcept;
// Commit the last read buffer.
void commit_read(buffer_t const* buffer) noexcept;
private:
Self& _self() noexcept { return static_cast<Self&>(*this); }
};
// Default policies used by basic_ring
template<typename T>
struct default_ring_policies : public default_ringbuffer_policies
{
typedef T element_t;
static constexpr buffer_size_t buffer_size = sizeof(element_t);
};
// basic_ring encapsulate the logic for fixed size elements
template <typename Self, typename Policies> struct basic_ring
{
static_assert(
std::is_base_of<default_ring_policies<typename Policies::element_t>,
Policies>::value,
"In order to be backward compatible when new policies are added");
public:
typedef typename Policies::element_t element_t;
typedef typename Policies::buffer buffer_t;
public:
// Push an element into the ring and returns true on success
template <typename... Args> bool push(Args&&... args)
{
if (buffer_t* buf = _self().start_write(sizeof(element_t)))
{
new (&buf.data[0]) element_t(std::forward<Args>(args)...);
_self().commit_write(buf);
return true;
}
return false;
}
// Pop an element and move it into out. Returns true on success
bool pop(element_t& out)
{
if (buffer_t const* buf = _self().start_read())
{
element_t& src = *reinterpret_cast<element_t*>(&buf.data[0]);
out = std::move(src);
src.~element_t();
_self().commit_read(buf);
return true;
}
return false;
}
private:
Self& _self() noexcept { return static_cast<Self&>(*this); }
};
namespace storage
{
// malloc/free storage
template <typename Policies> struct heap
{
public:
typedef typename Policies::ring_size_t ring_size_t;
typedef typename Policies::byte_t byte_t;
private:
ring_size_t const _capacity;
byte_t* const _ring;
public:
explicit heap(ring_size_t const size)
: _capacity(size)
, _ring((byte_t*)std::malloc(size))
{
if (_ring == nullptr) throw std::bad_alloc();
}
heap(heap const&) = delete;
heap& operator=(heap const&) = delete;
~heap() { std::free(_ring); }
public:
ring_size_t capacity() const { return _capacity; }
byte_t const* ring() const { return _ring; }
byte_t* ring() { return _ring; }
};
// stack storage
template<size_t size, typename Policies> struct stack
{
public:
typedef typename Policies::ring_size_t ring_size_t;
typedef typename Policies::byte_t byte_t;
private:
std::array<byte_t, size> _ring;
public:
stack() = default;
stack(stack const&) = delete;
stack& operator=(stack const&) = delete;
public:
ring_size_t capacity() const { return size; }
byte_t const* ring() const { return &_ring[0]; }
byte_t* ring() { return &_ring[0]; }
};
}
template <typename T, typename Policies = default_ring_policies<T>>
struct spsc_queue
: public storage::heap<Policies>,
public basic_ringbuffer<spsc_queue<T, Policies>, Policies>,
public basic_ring<spsc_queue<T, Policies>, Policies>
{
public:
explicit spsc_queue(size_t size)
: storage::heap<Policies>(size)
{}
};
template <typename T,
size_t size,
typename Policies = default_ring_policies<T>>
struct fixed_spsc_queue
: public storage::stack<sizeof(T) * size, Policies>,
public basic_ringbuffer<fixed_spsc_queue<T, size, Policies>, Policies>,
public basic_ring<fixed_spsc_queue<T, size, Policies>, Policies>
{};
}}
@timblechmann
Copy link

two points:

  • changing the API is not an option
  • c++11 isn't really an option, either. i'm not sure if there is any policy of dropping c++03 support for old libraries

maybe branching off boost.lockfree may be a good start?

@timblechmann
Copy link

as for packet_ringbuffer ... i'd prefer if storage and basic_ringbuffer would not be exposed to the public interface, but that we do all the configuration with policy classes

@hotgloupi
Copy link
Author

hotgloupi commented Feb 3, 2017

Sorry for the delay, I wasn't notified that you commented in here :)

changing the API is not an option

I'm not proposing to change the API at all, that's just a draft to explore how the underlying classes could look like.

c++11 isn't really an option, either. i'm not sure if there is any policy of dropping c++03 support for old libraries

No worries, I'm using c++11 only for convenience, everything (except the variadic push) should be easily convertible to c++98

maybe branching off boost.lockfree may be a good start?

Ok, will do, might be simpler.

as for packet_ringbuffer ... i'd prefer if storage and basic_ringbuffer would not be exposed to the public interface, but that we do all the configuration with policy classes

I'm not sure yet about how things should be combined. The way I see it, I'm using inheritance to mix different concerns together. It would be possible to inherit privately and make some components friends. In any case the packet_ringuffer is just an example of a user defined class.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment