Instantly share code, notes, and snippets.

Embed
What would you like to do?
Thread Safe SPSC Queue Lockless Sequential Memory
// USING SEQUENTIAL MEMORY
#include<thread>
#include<atomic>
#include <cinttypes>
using namespace std;
#define RING_BUFFER_SIZE 1000
class lockless_ring_buffer_spsc
{
public :
lockless_ring_buffer_spsc()
{
write.store(0);
read.store(0);
}
bool tryPush(T val)
{
const auto current_tail = m_write.load();
const auto next_tail = increment(current_tail);
if (current_tail - m_read.load() <= m_capacity -1 )
{
m_buffer.get()[current_tail % m_capacity ] = val;
m_write.store(next_tail);
return true;
}
return false;
}
void push(int64_t val)
{
while( ! try_push(val) );
}
bool tryPop(T* element)
{
const auto currentHead = m_read.load();
if (currentHead != m_write.load())
{
*element = m_buffer.get()[currentHead % m_capacity];
m_read.store(increment(currentHead));
return true;
}
return false;
}
int64_t pop()
{
int64_t ret;
while( ! try_pop(&ret) );
return ret;
}
private :
std::atomic<int64_t> write ;
std::atomic<int64_t> read;
int64_t size = RING_BUFFER_SIZE;
int64_t buffer[RING_BUFFER_SIZE];
int64_t increment(int n)
{
return (n + 1);
}
};
int main (int argc, char** argv)
{
lockless_ring_buffer_spsc queue;
std::thread write_thread( [&] () {
for(int i = 0; i<1000000; i++)
{
queue.push(i);
}
} // End of lambda expression
);
std::thread read_thread( [&] () {
for(int i = 0; i<1000000; i++)
{
queue.pop();
}
} // End of lambda expression
);
write_thread.join();
read_thread.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment