Skip to content

Instantly share code, notes, and snippets.

@ryul1206
Created November 16, 2019 11:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryul1206/41bf843ed4bfdbf75c682ea213fc5c86 to your computer and use it in GitHub Desktop.
Save ryul1206/41bf843ed4bfdbf75c682ea213fc5c86 to your computer and use it in GitHub Desktop.
// 2018. Hong-ryul Jung
#pragma once
#include <array>
#include <atomic>
#include <vector>
#include <cstring>
#include <thread>
#define __CON_QUEUE_SIZE__ 2048 // 1 << 11
#define __QUEUE_IDX_MASK__ 2047 // __CON_QUEUE_SIZE__ - 1
// cache line is 64 bytes
template<typename T>
struct __queue_packet
{
T* data = nullptr; // 8 bytes
std::atomic<int> reader_count{ 0 }; // 4 bytes
};
template<typename T>
using news = struct __queue_packet<T>;
/* Concurrent Queue
* The idea is from "LMAX Disruptor" (concurrent queue with ring buffer)
* ----------------------------------------------------------------
* writer :
* when increase head : we get current index of head and increase head.
* [cliam ] increase head - when (reader_count of head) == 0 (busy_wait)
* and write data << memcpy >>
* [commit] if cursor is index-1, then increase cursor +1
* else busy_waiting
* reader :
* [read ] increase reader_count of next - but, cursor is reading limit.
* and decrease reader_count of before
* and read data
*/
template<typename T>
class ConcurrentQueue
{
public:
ConcurrentQueue(size_t num_reader)
{
/////////////////////////////
for (news<T>& e : ring_buffer)
e.data = new T();
/////////////////////////////
ring_buffer[0].reader_count.store((int)num_reader);
for (size_t i = 0; i < num_reader; i++)
readers.push_back(0);
}
virtual ~ConcurrentQueue()
{
for (news<T>& e : ring_buffer)
delete e.data;
}
void write(const T* elem)
{
// Claim
unsigned long long claimed_idx = claim();
std::memcpy(ring_buffer[idx(claimed_idx)].data, elem, sizeof(T));
// Commit
commit(claimed_idx);
}
/*
* Before read : [ c = 1 ] [ c = 0 ] [claimed] [ c = 3 ] [ c = 1 ]
* readers writer head
* cursor
*
* After read : [ c = 0 ] [ c = 1 ] [claimed] [ c = 3 ] [ c = 1 ]
* readers writer head
* cursor
*/
bool read(const unsigned short reader_id, T* copy_here)
{
// Check cursor index (cursor is limitation)
unsigned long long now = readers[reader_id];
if (now == cursor.load()) { return false; }
else
{
size_t next_idx = idx(now + 1);
ring_buffer[next_idx].reader_count.fetch_add(1);
std::memcpy(copy_here, ring_buffer[next_idx].data, sizeof(T));
ring_buffer[idx(now)].reader_count.fetch_sub(1);
// ERROR FIXED. reader_idx was increased, when if_cond is true.
readers[reader_id] += 1;
return true;
}
}
unsigned long long get_cumulative_writes() { return head.load() - 1; }
private:
std::array<news<T>, __CON_QUEUE_SIZE__> ring_buffer;
std::atomic<unsigned long long> head = 1;
std::atomic<unsigned long long> cursor = 0;
std::vector<unsigned long long> readers;
inline size_t idx(unsigned long long sequence)
{
return (sequence & __QUEUE_IDX_MASK__);
}
/* Head can increase self, only when (count == 0)
* Before claim : [ data ] [claimed] [ c = 0 ] [ c = 3 ] [ c = 1 ]
* reader writer head
* cursor
*
* After claim : [ data ] [claimed] [claimed] [ c = 3 ] [ c = 1 ]
* reader writer writer head
* cursor
*/
unsigned long long claim()
{
while (ring_buffer[idx(head.load())].reader_count.load())
{
std::this_thread::yield();
}
// Return: The value immediately preceding the effects of this function
return head.fetch_add(1);
}
/* Cursor can increase self, only when (cursor == claimed - 1)
* Before commit : [ data ] [claimed] [claimed] [ c = 3 ] [ c = 1 ]
* readers writer writer head
* cursor
*
* After commit : [ data ] [ data ] [claimed] [ c = 3 ] [ c = 1 ]
* readers readers writer head
* cursor
*/
/*
* Commit all : [ data ] [ data ] [ data ] [ c = 3 ] [ c = 1 ]
* readers readers head
* cursor
*/
void commit(const unsigned long long claimed)
{
unsigned long long expected = claimed - 1;
while (cursor.load() < expected) { std::this_thread::yield(); }
cursor.fetch_add(1);
}
};
@ryul1206
Copy link
Author

C++, Multiple enqueue, multiple dequeue, lock-free, short and simple

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment