Skip to content

Instantly share code, notes, and snippets.

@pfirsich
Last active April 26, 2022 10:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pfirsich/e990ab2200721cae1382ed9995d3addd to your computer and use it in GitHub Desktop.
Save pfirsich/e990ab2200721cae1382ed9995d3addd to your computer and use it in GitHub Desktop.
Dmitry Vyukov's thread-safe multiple producer single consumer queue with wait free production ported to C++. I read the source code carefully and added lots of comments explaining what it does (it took me a while to figure it all out).
// g++ -Wall -Wextra -O0 -g -o log_test
#include <atomic>
#include <optional>
// Vyukov MPSC (wait-free multiple producers, single consumer) queue
// https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
template <typename T>
class MpscQueue {
public:
MpscQueue()
: stub_()
, consumeEnd_(&stub_)
, produceEnd_(&stub_)
{
}
void produce(T&& value)
{
produce(new Node { std::move(value), nullptr });
}
std::optional<T> consume()
{
auto node = consumeEnd_.load();
auto next = node->next.load();
// If we are supposed to consume the stub, then the list is either empty (nullopt)
// or this is the first time we consume, in which case we just move consumeEnd ahead.
if (node == &stub_) {
if (!next) {
return std::nullopt;
}
consumeEnd_.store(next);
node = next;
next = node->next;
}
if (next) {
consumeEnd_.store(next);
return unpackNode(node);
}
// If we don't have a `next` element, `node` should be the last item in the list,
// unless a new item was produced since we last loaded consumeEnd.
// If there was, we need to try from the start (because there would be a `next`).
// Instead of calling consume recursively (dangerous), we just bail and let the caller
// retry.
// I am fairly sure you could leave this check out completely and it would still work
// correctly, but it would be less efficient.
if (node != produceEnd_.load()) {
return std::nullopt;
}
// Assuming the check above failed (and we got here), the state of the list should be:
// stub -> node (consumeEnd, produceEnd) -> nullptr
// Since we have no next item to make the new consumeEnd, we need to put stub_ into the
// queue again.
stub_.next.store(nullptr);
produce(&stub_);
// Now we have either attached stub to `node` or to another element other producer threads
// might have added in the meantime.
// In case we have finished attaching the other element to stub_, but the other producer
// thread has not finished attaching `node` to the new element (i.e. it did not set
// node->next yet), the below condition (`if (next)`) would be false.
// Assuming one other producer thread the list would look like this (next != NULL):
// node (consumeEnd) *(-> elem) -> stub (produceEnd)
// or this (next is NULL):
// node (consumeEnd) -X- elem -> stub (produceEnd)
// The latter case is what Vyukov refers to saying that the consumer is blocking (see source link).
next = node->next.load();
if (next) {
consumeEnd_.store(next);
return unpackNode(node);
}
// If the other thread has not managed to attach the new element to `node` yet, we have no
// other choice but to wait for it to finish, so we return nullopt.
return std::nullopt;
}
private:
struct Node {
T value;
// "next" in the order of consumption
std::atomic<Node*> next;
};
static T unpackNode(Node* node)
{
auto value = std::move(node->value);
delete node;
return value;
}
void produce(Node* node)
{
auto prev = produceEnd_.exchange(node);
prev->next.store(node);
}
// This is not an actual element of the queue, but simply a place to "park" consumeEnd, when
// there is nothing to consume.
// Sadly this makes default constructability for T a requirement.
Node stub_;
// Yes, screw "head" and "tail" and everyone doing whatever they please with those words.
std::atomic<Node*> consumeEnd_;
std::atomic<Node*> produceEnd_;
};
#include <cstdlib>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <unistd.h>
// You likely might have to tweak these so you sometimes get longer and shorter (and empty) queues.
// Probably you also want to make sure the queue size is not continuously increasing.
constexpr int maxNumProduces = 4;
constexpr size_t numThreads = 8;
MpscQueue<std::string> queue;
std::atomic<size_t> size;
std::atomic<size_t> producerId { 0 };
void producerThreadFunc()
{
auto id = producerId++;
size_t line = 0;
while (true) {
const auto num = std::rand() % maxNumProduces;
for (int i = 0; i < num; ++i) {
queue.produce("[" + std::to_string(id) + "] log " + std::to_string(line++) + "\n");
size++;
}
std::cout << "queue size: " << size.load() << std::endl;
::usleep(1);
}
}
int main()
{
std::vector<std::thread> producerThreads(numThreads);
for (auto& thread : producerThreads) {
thread = std::thread { producerThreadFunc };
};
while (true) {
const auto line = queue.consume();
if (!line) {
::usleep(1);
continue;
}
size--;
std::cout << *line;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment