Created
November 16, 2019 11:05
-
-
Save ryul1206/41bf843ed4bfdbf75c682ea213fc5c86 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// 2018. Hong-ryul Jung | |
#pragma once | |
#include <array> | |
#include <atomic> | |
#include <vector> | |
#include <cstring> | |
#include <thread> | |
#define __CON_QUEUE_SIZE__ 2048 // 1 << 11 | |
#define __QUEUE_IDX_MASK__ 2047 // __CON_QUEUE_SIZE__ - 1 | |
// cache line is 64 bytes | |
template<typename T> | |
struct __queue_packet | |
{ | |
T* data = nullptr; // 8 bytes | |
std::atomic<int> reader_count{ 0 }; // 4 bytes | |
}; | |
template<typename T> | |
using news = struct __queue_packet<T>; | |
/* Concurrent Queue | |
* The idea is from "LMAX Disruptor" (concurrent queue with ring buffer) | |
* ---------------------------------------------------------------- | |
* writer : | |
* when increase head : we get current index of head and increase head. | |
* [cliam ] increase head - when (reader_count of head) == 0 (busy_wait) | |
* and write data << memcpy >> | |
* [commit] if cursor is index-1, then increase cursor +1 | |
* else busy_waiting | |
* reader : | |
* [read ] increase reader_count of next - but, cursor is reading limit. | |
* and decrease reader_count of before | |
* and read data | |
*/ | |
template<typename T> | |
class ConcurrentQueue | |
{ | |
public: | |
ConcurrentQueue(size_t num_reader) | |
{ | |
///////////////////////////// | |
for (news<T>& e : ring_buffer) | |
e.data = new T(); | |
///////////////////////////// | |
ring_buffer[0].reader_count.store((int)num_reader); | |
for (size_t i = 0; i < num_reader; i++) | |
readers.push_back(0); | |
} | |
virtual ~ConcurrentQueue() | |
{ | |
for (news<T>& e : ring_buffer) | |
delete e.data; | |
} | |
void write(const T* elem) | |
{ | |
// Claim | |
unsigned long long claimed_idx = claim(); | |
std::memcpy(ring_buffer[idx(claimed_idx)].data, elem, sizeof(T)); | |
// Commit | |
commit(claimed_idx); | |
} | |
/* | |
* Before read : [ c = 1 ] [ c = 0 ] [claimed] [ c = 3 ] [ c = 1 ] | |
* readers writer head | |
* cursor | |
* | |
* After read : [ c = 0 ] [ c = 1 ] [claimed] [ c = 3 ] [ c = 1 ] | |
* readers writer head | |
* cursor | |
*/ | |
bool read(const unsigned short reader_id, T* copy_here) | |
{ | |
// Check cursor index (cursor is limitation) | |
unsigned long long now = readers[reader_id]; | |
if (now == cursor.load()) { return false; } | |
else | |
{ | |
size_t next_idx = idx(now + 1); | |
ring_buffer[next_idx].reader_count.fetch_add(1); | |
std::memcpy(copy_here, ring_buffer[next_idx].data, sizeof(T)); | |
ring_buffer[idx(now)].reader_count.fetch_sub(1); | |
// ERROR FIXED. reader_idx was increased, when if_cond is true. | |
readers[reader_id] += 1; | |
return true; | |
} | |
} | |
unsigned long long get_cumulative_writes() { return head.load() - 1; } | |
private: | |
std::array<news<T>, __CON_QUEUE_SIZE__> ring_buffer; | |
std::atomic<unsigned long long> head = 1; | |
std::atomic<unsigned long long> cursor = 0; | |
std::vector<unsigned long long> readers; | |
inline size_t idx(unsigned long long sequence) | |
{ | |
return (sequence & __QUEUE_IDX_MASK__); | |
} | |
/* Head can increase self, only when (count == 0) | |
* Before claim : [ data ] [claimed] [ c = 0 ] [ c = 3 ] [ c = 1 ] | |
* reader writer head | |
* cursor | |
* | |
* After claim : [ data ] [claimed] [claimed] [ c = 3 ] [ c = 1 ] | |
* reader writer writer head | |
* cursor | |
*/ | |
unsigned long long claim() | |
{ | |
while (ring_buffer[idx(head.load())].reader_count.load()) | |
{ | |
std::this_thread::yield(); | |
} | |
// Return: The value immediately preceding the effects of this function | |
return head.fetch_add(1); | |
} | |
/* Cursor can increase self, only when (cursor == claimed - 1) | |
* Before commit : [ data ] [claimed] [claimed] [ c = 3 ] [ c = 1 ] | |
* readers writer writer head | |
* cursor | |
* | |
* After commit : [ data ] [ data ] [claimed] [ c = 3 ] [ c = 1 ] | |
* readers readers writer head | |
* cursor | |
*/ | |
/* | |
* Commit all : [ data ] [ data ] [ data ] [ c = 3 ] [ c = 1 ] | |
* readers readers head | |
* cursor | |
*/ | |
void commit(const unsigned long long claimed) | |
{ | |
unsigned long long expected = claimed - 1; | |
while (cursor.load() < expected) { std::this_thread::yield(); } | |
cursor.fetch_add(1); | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
C++, Multiple enqueue, multiple dequeue, lock-free, short and simple