-
-
Save hotgloupi/0e970a0e11091dfb0df8f942a0352b87 to your computer and use it in GitHub Desktop.
namespace lf = boost::lockfree; | |
struct packet_ringbuffer_policies : lf::default_ringbuffer_policies | |
{ | |
typedef std::uin16_t buffer_size_t; | |
struct buffer | |
{ | |
buffer_size_t size; | |
byte_t data; | |
}; | |
}; | |
struct packet_ringbuffer | |
: public lf::storage::heap<packet_ringbuffer_policies> | |
, public lf::basic_ringbuffer<packet_ringbuffer, packet_ringbuffer_policies> | |
{ | |
typedef lf::storage::heap<packet_ringbuffer_policies> storage_t; | |
packet_ringbuffer(size_t size) : storage_t(size) | |
{} | |
}; |
#pragma once | |
#include <array> | |
#include <atomic> | |
#include <cstdint> | |
#include <type_traits> | |
#include <utility> | |
namespace boost { namespace lockfree { | |
#define BOOST_LOCKFREE_CACHELINE_BYTES 64 | |
// Default policies used by basic_ringbuffer | |
struct default_ringbuffer_policies | |
{ | |
// buffer's byte type | |
typedef std::uint8_t byte_t; | |
// type used to represent ring size and indices | |
typedef std::size_t ring_size_t; | |
// type used to represent a buffer size | |
typedef std::size_t buffer_size_t; | |
// buffer type | |
struct buffer | |
{ | |
buffer_size_t size; | |
byte_t data[]; | |
}; | |
// a buffer_size > 0 means that we have fixed size buffers | |
static constexpr buffer_size_t buffer_size = 0; | |
}; | |
// basic_ringbuffer encapsulate the logic about lockfree buffer | |
// manipulations. | |
template <typename Self, typename Policies> | |
struct basic_ringbuffer | |
{ | |
static_assert( | |
std::is_base_of<default_ringbuffer_policies, Policies>::value, | |
"In order to be backward compatible when new policies are added"); | |
public: | |
typedef typename Policies::byte_t byte_t; | |
typedef typename Policies::ring_size_t ring_size_t; | |
typedef typename Policies::buffer_size_t buffer_size_t; | |
typedef typename Policies::buffer buffer_t; | |
protected: | |
std::atomic<ring_size_t> _write_end; | |
alignas(BOOST_LOCKFREE_CACHELINE_BYTES) | |
std::atomic<ring_size_t> _read_end; | |
public: | |
basic_ringbuffer() | |
: _write_end(0) | |
, _read_end(0) | |
{} | |
public: | |
// Reserve a buffer in the ring and returns it. Returns nullptr if the | |
// ringbuffer is full. | |
buffer_t* start_write(buffer_size_t const size) noexcept; | |
// Commit size bytes into the ringbuffer. | |
// precondition: a buffer was previously returned by start_write() | |
// precondition: size <= previously asked size | |
void commit_write(buffer_size_t const size) noexcept; | |
// Commit buffer into the ringbuffer. | |
// precondition: buffer was previously returned by start_write() | |
// precondition: buffer->size <= previously asked size | |
void commit_write(buffer_t* buffer) noexcept; | |
// Returns the next buffer or nullptr if the ringbuffer is empty. | |
buffer_t const* start_read() noexcept; | |
// Commit the last read buffer. | |
void commit_read(buffer_t const* buffer) noexcept; | |
private: | |
Self& _self() noexcept { return static_cast<Self&>(*this); } | |
}; | |
// Default policies used by basic_ring | |
template<typename T> | |
struct default_ring_policies : public default_ringbuffer_policies | |
{ | |
typedef T element_t; | |
static constexpr buffer_size_t buffer_size = sizeof(element_t); | |
}; | |
// basic_ring encapsulate the logic for fixed size elements | |
template <typename Self, typename Policies> struct basic_ring | |
{ | |
static_assert( | |
std::is_base_of<default_ring_policies<typename Policies::element_t>, | |
Policies>::value, | |
"In order to be backward compatible when new policies are added"); | |
public: | |
typedef typename Policies::element_t element_t; | |
typedef typename Policies::buffer buffer_t; | |
public: | |
// Push an element into the ring and returns true on success | |
template <typename... Args> bool push(Args&&... args) | |
{ | |
if (buffer_t* buf = _self().start_write(sizeof(element_t))) | |
{ | |
new (&buf.data[0]) element_t(std::forward<Args>(args)...); | |
_self().commit_write(buf); | |
return true; | |
} | |
return false; | |
} | |
// Pop an element and move it into out. Returns true on success | |
bool pop(element_t& out) | |
{ | |
if (buffer_t const* buf = _self().start_read()) | |
{ | |
element_t& src = *reinterpret_cast<element_t*>(&buf.data[0]); | |
out = std::move(src); | |
src.~element_t(); | |
_self().commit_read(buf); | |
return true; | |
} | |
return false; | |
} | |
private: | |
Self& _self() noexcept { return static_cast<Self&>(*this); } | |
}; | |
namespace storage | |
{ | |
// malloc/free storage | |
template <typename Policies> struct heap | |
{ | |
public: | |
typedef typename Policies::ring_size_t ring_size_t; | |
typedef typename Policies::byte_t byte_t; | |
private: | |
ring_size_t const _capacity; | |
byte_t* const _ring; | |
public: | |
explicit heap(ring_size_t const size) | |
: _capacity(size) | |
, _ring((byte_t*)std::malloc(size)) | |
{ | |
if (_ring == nullptr) throw std::bad_alloc(); | |
} | |
heap(heap const&) = delete; | |
heap& operator=(heap const&) = delete; | |
~heap() { std::free(_ring); } | |
public: | |
ring_size_t capacity() const { return _capacity; } | |
byte_t const* ring() const { return _ring; } | |
byte_t* ring() { return _ring; } | |
}; | |
// stack storage | |
template<size_t size, typename Policies> struct stack | |
{ | |
public: | |
typedef typename Policies::ring_size_t ring_size_t; | |
typedef typename Policies::byte_t byte_t; | |
private: | |
std::array<byte_t, size> _ring; | |
public: | |
stack() = default; | |
stack(stack const&) = delete; | |
stack& operator=(stack const&) = delete; | |
public: | |
ring_size_t capacity() const { return size; } | |
byte_t const* ring() const { return &_ring[0]; } | |
byte_t* ring() { return &_ring[0]; } | |
}; | |
} | |
template <typename T, typename Policies = default_ring_policies<T>> | |
struct spsc_queue | |
: public storage::heap<Policies>, | |
public basic_ringbuffer<spsc_queue<T, Policies>, Policies>, | |
public basic_ring<spsc_queue<T, Policies>, Policies> | |
{ | |
public: | |
explicit spsc_queue(size_t size) | |
: storage::heap<Policies>(size) | |
{} | |
}; | |
template <typename T, | |
size_t size, | |
typename Policies = default_ring_policies<T>> | |
struct fixed_spsc_queue | |
: public storage::stack<sizeof(T) * size, Policies>, | |
public basic_ringbuffer<fixed_spsc_queue<T, size, Policies>, Policies>, | |
public basic_ring<fixed_spsc_queue<T, size, Policies>, Policies> | |
{}; | |
}} |
as for packet_ringbuffer
... i'd prefer if storage
and basic_ringbuffer
would not be exposed to the public interface, but that we do all the configuration with policy classes
Sorry for the delay, I wasn't notified that you commented in here :)
changing the API is not an option
I'm not proposing to change the API at all, that's just a draft to explore how the underlying classes could look like.
c++11 isn't really an option, either. i'm not sure if there is any policy of dropping c++03 support for old libraries
No worries, I'm using c++11 only for convenience, everything (except the variadic push) should be easily convertible to c++98
maybe branching off boost.lockfree may be a good start?
Ok, will do, might be simpler.
as for packet_ringbuffer ... i'd prefer if storage and basic_ringbuffer would not be exposed to the public interface, but that we do all the configuration with policy classes
I'm not sure yet about how things should be combined. The way I see it, I'm using inheritance to mix different concerns together. It would be possible to inherit privately and make some components friends. In any case the packet_ringuffer
is just an example of a user defined class.
two points:
maybe branching off boost.lockfree may be a good start?