Skip to content

Instantly share code, notes, and snippets.

@EvanBalster
Last active March 7, 2022 22:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save EvanBalster/77dfa7229ca94aa1eb49d54a06b907dd to your computer and use it in GitHub Desktop.
Save EvanBalster/77dfa7229ca94aa1eb49d54a06b907dd to your computer and use it in GitHub Desktop.
C++98 lockfree ringbuffer without <atomic> (reliable under MSVC, no issues observed on ARM)
#ifndef PLAID_ASYNC_RING_H
#define PLAID_ASYNC_RING_H
#include <algorithm>
namespace plaid
{
/*
This class houses general-purpose logic for managing asynchronous ringbuffers.
*/
class async_ring_control
{
public:
async_ring_control(size_t capacity) : c(capacity), r(0), w(0) {}
/*
Query the maximum capacity of the ring, count the items contained or query free slots.
These functions run in constant time.
These may be called from producer OR consumer threads.
*/
size_t capacity() const {return c;}
bool empty() const {return r == w;}
bool full() const {return (r+c == w) | (w+c == r);}
// Check the number of available items or free spaces.
size_t r_avail() const {return (w < r) ? (2*c+w-r) : (w-r);}
size_t w_avail() const {return c - r_avail();}
// Check if the given number of elements can be safely read or written.
bool w_ok() const {return !full();}
bool w_ok(size_t n) const {return w_avail() >= n;}
bool r_ok() const {return !empty();}
bool r_ok(size_t n) const {return r_avail() >= n;}
// Get the position for reading or writing.
size_t w_pos() const {return w % c;}
size_t w_pos(size_t offset) const {return (w + offset) % c;}
size_t r_pos() const {return r % c;}
size_t r_pos(size_t offset) const {return (r + offset) % c;}
// Calculate maximum contiguous read or write, possibly including an offset.
size_t w_cont() const {return c - ( w % c);}
size_t w_cont(size_t offset) const {return c - ((w+offset) % c);}
size_t r_cont() const {return c - ( r % c);}
size_t r_cont(size_t offset) const {return c - ((r+offset) % c);}
// Advance position for reading or writing, after completing an atomic operation.
void w_adv() {w = (w + 1) % (2*c);}
void w_adv(size_t n) {w = (w + n) % (2*c);}
void r_adv() {r = (r + 1) % (2*c);}
void r_adv(size_t n) {r = (r + n) % (2*c);}
// Clear elements
void clear() {r = w;}
void clear_to_one() {auto p = w; r = (r==p) ? p : ((p==0) ? (2*c-1) : p-1);}
public:
// Fields. Treat with extreme care.
const size_t c;
volatile size_t r, w;
};
/*
A ringbuffer: The most simple, robust and performant lockfree structure.
It has the drawback of a limited capacity and requires a type with a
default constructor as it stores [capacity] objects by value.
Unlike the above structures, multiple values may be written to or read
from the ringbuffer "simultaneously" for multithreading purposes.
This averts race conditions when transferring blocks of data.
(Internally a double-range trick is used to allow use of all elements.)
Variants:
async_ring_at uses an array provided by the user.
async_ring simply uses new[] and delete[].
*/
template<typename T>
class async_ring_at
{
public:
/*
Creation and destruction should occur in the same thread.
It is safe to construct a ring of capacity 0, but all other behaviors are undefined.
*/
async_ring_at(size_t capacity, T *_store) : c(capacity), store(_store) {}
~async_ring_at() {}
/*
A successful push writes t into the ringbuffer and returns true;
If the queue is full, has no effect and returns false.
Only the producer should call this method.
*/
bool push(const T &t) {if (c.w_ok()) {store[c.w_pos()] = t; c.w_adv(); return 1;} return 0;}
/*
Instead of a copying push, allocate an item with push_start and push with push_finish.
Don't proceed with push_finish unless push_start was successful!
Only the producer should call these methods.
*/
T *push_start() {return full() ? nullptr : &store[c.w_pos()];}
bool push_finish() {if (full()) return 0; c.w_adv(); return 1;}
/*
These functions allow the consumer to access the new items in the queue, if any.
They return whether they were successful.
pull --- copies the next item (overwriting t) and removes it.
peek --- copies the next item (overwriting t).
pop --- removes the next item.
pop_all --- removes all items.
pop_to_one --- removes all items but the last.
pull_last --- copies the last item (overwriting t) and removes all items.
Only the CONSUMER should call these methods.
*/
bool pull(T &t) {return empty() ? false : (t = store[c.r_pos()], c.r_adv(), true);}
bool peek(T &t) const {return empty() ? false : (t = store[c.r_pos()], true);}
bool pop () {return empty() ? false : ( c.r_adv(), true);}
void pop_all() {c.clear();}
bool pop_to_one() {return empty() ? false : (c.clear_to_one(), true);}
bool pull_last(T &t) {pop_to_one(); return pull(t);}
/*
Peek with pointer. This avoids a copy, but the item becomes invalid after being popped.
Returns NULL if the queue is empty.
Only the CONSUMER should call these methods.
*/
T *peek_ptr() {if (empty()) {return 0;} return &store[c.r_pos()];}
const T *peek_ptr() const {if (empty()) {return 0;} return &store[c.r_pos()];}
/*
Access the last item that was pushed into the ring.
This may be called from producer OR consumer threads,
but the result is invalid if the ring is empty.
*/
const T &last_pushed() const {return store[c.w_pos(c.capacity()-1)];}
/*
Query the maximum capacity of the ring, count the items contained or query free slots.
These functions run in constant time.
These may be called from producer OR consumer threads.
*/
size_t capacity() const {return c.capacity();}
size_t count() const {return c.r_avail();}
size_t freeSlots() const {return c.w_avail();}
bool empty() const {return c.empty();}
bool full() const {return c.full();}
bool can_write() const {return c.w_ok();}
bool can_write(size_t n) const {return c.w_ok(n);}
bool can_read () const {return c.r_ok();}
bool can_read (size_t n) const {return c.r_ok(n);}
/*
Push multiple elements simultaneously. Call only from producer.
If there is not enough capacity the push will have no effect.
Return value and force parameter behave as above.
*/
bool push(const T *array, size_t count)
{
if (!can_write(count)) return false;
return push_some(array, count) == count;
}
/*
Pull multiple elements simultaneously. Call only from consumer.
If there is not enough data the pull will consume no elements.
Return value indicates success or failure as above.
*/
bool pull(T *array, size_t count)
{
if (!can_read(count)) return false;
return pull_some(array, count) == count;
}
/*
Similar to the above, but permits partial success.
Returns number of elements pushed or pulled.
*/
size_t push_some(const T *array, size_t count)
{
size_t done = 0;
while (done < count)
{
auto chunk = write_chunk(count-done, done);
if (chunk.count == 0) break;
_copy(chunk.ptr, array+done, chunk.count);
done += chunk.count;
}
write_finish(done);
return done;
}
size_t pull_some(T *array, size_t count)
{
size_t done = 0;
while (done < count)
{
auto chunk = read_chunk(count-done, done);
if (chunk.count == 0) break;
_copy(array+done, chunk.ptr, chunk.count);
done += chunk.count;
}
read_finish(done);
return done;
}
/*
Instead of a copying push, write items directly with these calls:
if (ring.can_write(n))
{
size_t written = 0;
while (written < n)
{
auto chunk = ring.write_chunk(n-written, written);
if (chunk.count == 0) break;
// ...write to chunk...
written += chunk.count;
}
ring.write_finish(written);
}
*/
struct write_array
{
T *ptr;
size_t count;
};
struct read_array
{
const T *ptr;
size_t count;
};
write_array write_chunk(size_t n)
{
return {&store[c.w_pos()], std::min(std::min(n, c.w_cont()), c.w_avail())};
}
read_array read_chunk(size_t n) const
{
return {&store[c.r_pos()], std::min(std::min(n, c.r_cont()), c.r_avail())};
}
write_array write_chunk(size_t n, size_t offset)
{
auto avail = c.w_avail(); if (offset >= avail) return {0,0};
return {&store[c.w_pos(offset)], std::min(std::min(n, c.w_cont(offset)), avail-offset)};
}
read_array read_chunk(size_t n, size_t offset) const
{
auto avail = c.r_avail(); if (offset >= avail) return {0,0};
return {&store[c.r_pos(offset)], std::min(std::min(n, c.r_cont(offset)), avail-offset)};
}
bool read_finish (size_t nTotal) {return c.r_ok(nTotal) ? (c.r_adv(nTotal), true) : false;}
bool write_finish(size_t nTotal) {return c.w_ok(nTotal) ? (c.w_adv(nTotal), true) : false;}
private:
//No copying!
async_ring_at (const async_ring_at &other) = delete;
async_ring_at &operator=(const async_ring_at &other) = delete;
private:
async_ring_control c;
static void _copy(T *dst, const T *src, size_t n)
{
for (auto end = dst+n; dst < end; ++dst, ++src) *dst = *src;
}
public:
//The ringbuffer's storage. Obviously, be careful with this.
T *store;
};
/*
An async_ring whose capacity is built-in. Elements will be default-instantiated...
*/
template<typename T, size_t Capacity>
class async_ring_auto : public async_ring_at<T>
{
public:
// Construct.
async_ring_auto() : async_ring_at<T>(Capacity, _buffer) {}
~async_ring_auto() {}
// Construct and initialize each item with a functor taking T&.
template<typename InitFunc>
async_ring_auto(InitFunc init) :
async_ring_at<T>(Capacity, _buffer) {for (auto &i: _buffer) init(i);}
private:
T _buffer[Capacity];
};
/*
An async_ring which allocates its own memory.
*/
template<typename T>
class async_ring : public async_ring_at<T>
{
public:
// Construct.
async_ring(size_t capacity = 16) : async_ring_at<T>(capacity, new T[capacity]) {}
~async_ring() {delete[] async_ring_at<T>::store;};
// Construct and initialize each item with a functor taking T&.
template<typename InitFunc>
async_ring(size_t capacity, InitFunc init) :
async_ring_at<T>(capacity, new T[capacity]) {for (size_t i=0; i<capacity; ++i) init(async_ring_at<T>::store[i]);}
};
}
#endif // PLAID_ASYNC_RING_H
@EvanBalster
Copy link
Author

This data structure was written against C++98 and would obviously be improved by using std::atomic or memory fences. It's provided here for demonstrative purposes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment