要求: 实现一个 single-producer single-consumer 的 lock-free ring buffer
思路:
因为只有一个生产者和一个消费者,我们可以通过想办法保证生产者和消费者互不影响对方的状态来消除数据竞争。
设计 ring buffer 时需要考虑如何代表 empty
状态。通常来说我们会使用 start = end = -1
来代表空状态,但是这样的设计会使得 push
和 pop
操作存在同时修改 start
和 end
的可能性。
我们采用留空一个 slot
的方式(始终有一个 slot
不存放元素;为空时,start = end = indexSlot
)来表示空状态。start
永远指向被留空的slot
,而(end + 1) % capacity
永远指向新元素将被存储的位置。
接下来我们验证这种设计是线程安全的。以下是 push
和 pop
的代码:
bool RingBuffer::push(int value) {
if (isFull()) {
return false;
}
int newEnd = (end.load() + 1) % capacity;
buffer[newEnd] = value;
end.store(newEnd);
return true;
}
pair<bool, int> RingBuffer::pop() {
if (isEmpty()) {
return make_pair(false, 0);
}
int newStart = (start.load() + 1) % capacity;
int value = buffer[newStart];
start.store(newStart);
return make_pair(true, value);
}
首先分析 push
函数,当我们成功跳过 if
语句后,消费者再执行 pop
函数,也并不会影响我们 push
函数的正确性,因为 pop
函数不可能影响我们往 (end + 1) % capacity
位置插入元素(我们留空了一个位置)。因为我们在完成push
后才修改end
的值,我们使用 atomic
来保证顺序一致性,防止编译器的重排序。
同样的,对于 pop
函数来说,当我们成功跳过 if
语句后,生产者执行 push
函数,也并不会影响 pop
函数的正确性,因为 push
函数不可能往 (start + 1) % capacity
位置插入元素(我们留空了一个位置)。因为我们在完成pop
后才修改start
的值,我们使用 atomic
来保证顺序一致性,防止编译器的重排序。
以下是完整代码:
#include <iostream>
using namespace std;
class RingBuffer {
public:
int capacity;
atomic<int> end;
atomic<int> start;
bool push(int value);
pair<bool, int> pop();
int length();
bool isFull();
bool isEmpty();
RingBuffer(int capacity) : capacity(capacity + 1) {
end = 0;
start = 0;
buffer = new int[capacity + 1];
}
~RingBuffer() {
delete[] buffer;
}
private:
int *buffer;
};
bool RingBuffer::push(int value) {
if (isFull()) {
return false;
}
int newEnd = (end.load() + 1) % capacity;
buffer[newEnd] = value;
end.store(newEnd);
return true;
}
pair<bool, int> RingBuffer::pop() {
if (isEmpty()) {
return make_pair(false, 0);
}
int newStart = (start.load() + 1) % capacity;
int value = buffer[newStart];
start.store(newStart);
return make_pair(true, value);
}
int RingBuffer::length() {
return (end + capacity - start) % capacity;
}
bool RingBuffer::isFull() {
return (end + 1) % capacity == start;
}
bool RingBuffer::isEmpty() {
return end == start;
}