Skip to content

Instantly share code, notes, and snippets.

@jjaychen1e
Last active October 10, 2023 15:22
Show Gist options
  • Save jjaychen1e/57c9a3b18114064dfd6d26c44f4894d3 to your computer and use it in GitHub Desktop.
Save jjaychen1e/57c9a3b18114064dfd6d26c44f4894d3 to your computer and use it in GitHub Desktop.
lock-free SPSC ring buffer

要求: 实现一个 single-producer single-consumer 的 lock-free ring buffer

思路:

因为只有一个生产者和一个消费者,我们可以通过想办法保证生产者和消费者互不影响对方的状态来消除数据竞争。

设计 ring buffer 时需要考虑如何代表 empty 状态。通常来说我们会使用 start = end = -1 来代表空状态,但是这样的设计会使得 pushpop 操作存在同时修改 startend 的可能性。

我们采用留空一个 slot 的方式(始终有一个 slot 不存放元素;为空时,start = end = indexSlot)来表示空状态。start永远指向被留空的slot,而(end + 1) % capacity 永远指向新元素将被存储的位置。

接下来我们验证这种设计是线程安全的。以下是 pushpop 的代码:

bool RingBuffer::push(int value) {
    if (isFull()) {
        return false;
    }

    int newEnd = (end.load() + 1) % capacity;
    buffer[newEnd] = value;
    end.store(newEnd);
    return true;
}

pair<bool, int> RingBuffer::pop() {
    if (isEmpty()) {
        return make_pair(false, 0);
    }

    int newStart = (start.load() + 1) % capacity;
    int value = buffer[newStart];
    start.store(newStart);
    return make_pair(true, value);
}

首先分析 push 函数,当我们成功跳过 if 语句后,消费者再执行 pop 函数,也并不会影响我们 push 函数的正确性,因为 pop 函数不可能影响我们往 (end + 1) % capacity 位置插入元素(我们留空了一个位置)。因为我们在完成push后才修改end的值,我们使用 atomic 来保证顺序一致性,防止编译器的重排序。

同样的,对于 pop 函数来说,当我们成功跳过 if 语句后,生产者执行 push 函数,也并不会影响 pop 函数的正确性,因为 push 函数不可能往 (start + 1) % capacity 位置插入元素(我们留空了一个位置)。因为我们在完成pop后才修改start的值,我们使用 atomic 来保证顺序一致性,防止编译器的重排序。

以下是完整代码:

#include <iostream>

using namespace std;

class RingBuffer {
public:
    int capacity;
    atomic<int> end;
    atomic<int> start;

    bool push(int value);

    pair<bool, int> pop();

    int length();

    bool isFull();

    bool isEmpty();

    RingBuffer(int capacity) : capacity(capacity + 1) {
        end = 0;
        start = 0;
        buffer = new int[capacity + 1];
    }

    ~RingBuffer() {
        delete[] buffer;
    }

private:
    int *buffer;
};


bool RingBuffer::push(int value) {
    if (isFull()) {
        return false;
    }

    int newEnd = (end.load() + 1) % capacity;
    buffer[newEnd] = value;
    end.store(newEnd);
    return true;
}

pair<bool, int> RingBuffer::pop() {
    if (isEmpty()) {
        return make_pair(false, 0);
    }

    int newStart = (start.load() + 1) % capacity;
    int value = buffer[newStart];
    start.store(newStart);
    return make_pair(true, value);
}

int RingBuffer::length() {
    return (end + capacity - start) % capacity;
}

bool RingBuffer::isFull() {
    return (end + 1) % capacity == start;
}

bool RingBuffer::isEmpty() {
    return end == start;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment