Skip to content

Instantly share code, notes, and snippets.

@pcordes
Created January 7, 2020 22:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pcordes/edcaf9089b02e787c6878e880c1f135a to your computer and use it in GitHub Desktop.
Save pcordes/edcaf9089b02e787c6878e880c1f135a to your computer and use it in GitHub Desktop.
bittnkr/uniq non-atomic testcase.cpp
// https://github.com/bittnkr/uniq/issues/2
// compile with g++ -m32 -O1 -pthread -Wall
// based on sandbox/hello.cpp but parameterized for qtype instead of int
// and queuing values that have non-zero high halves (and aren't all the same as each other!!!)
#include <pthread.h>
#include <stdio.h>
#include <thread>
#include <vector>
#include <atomic>
using namespace std;
inline bool CompareAndSwap(int *destination, int currentValue, int newValue) {
return __sync_bool_compare_and_swap(destination, currentValue, newValue);
}
using qtype = uint64_t;
class Queue {
private:
int _size, _head, _tail, mask; // FIXME: non-atomic head/tail makes spinning on empty/full a possible infinite loop
vector<qtype> buffer;
vector<char> isfree;
protected:
int tail() {
while (full())
sched_yield();
return _tail;
}
int head() {
while (empty())
sched_yield();
return _head;
}
public:
Queue(int size = 32) {
_size = size;
_tail = _head = 0;
mask = _size - 1;
buffer = vector<qtype>(_size, 0);
isfree = vector<char>(_size, 1);
}
int size() { return _size; }
int count() { return _tail - _head; }
int empty() { return _tail == _head; }
int full() { return _size == count(); }
void push(qtype item) {
int t;
do
t = tail();
while (!isfree[t & mask] || !CompareAndSwap(&_tail, t, t + 1));
isfree[t &= mask] = 0;
buffer[t] = item;
}
qtype pop() {
int h;
do
h = head();
while (isfree[h & mask] || !CompareAndSwap(&_head, h, h + 1));
qtype r = buffer[h &= mask];
isfree[h] = 1;
return r;
}
};
Queue Q;
atomic<qtype> Total(0); // a checsum, to ensure that all items pushed are poped
// this is the producer thread, it pushes data into the queue
void producer(int items)
{
qtype sum = 0;
for (int i = 1; i <= items; i+=2) {
qtype q0 = (1ULL<<32) + i + 0; // value with non-zero high half can race against buffer[t] = 0
qtype q1 = (0ULL<<32) + i + 1;
sum += q0 + q1;
Q.push(q0);
Q.push(q1);
// Q.push(1);
// Q.push(1); sum += 2;
}
Q.push(-1); // signal termination with a -1
Total += sum;
printf("Produced: %'9d, sum = %#llx\n", items, sum);
}
void consumer() // the consumer thread, takes data from the queue
{
qtype v, sum = 0;
int count = 0;
while ((v = Q.pop()) != -1ULL) {
sum += v;
count++;
}
Total -= sum;
printf("Consumed: %'9d, sum = %#llx\n", count, sum);
}
int main() {
int pairs = 1, Items = 1000*1000;
// pthread_t p[pairs], c[pairs];
vector<thread> producers(pairs);
vector<thread> consumers(pairs);
for (int i = 0; i < pairs; i++) {
consumers[i] = thread(consumer);
producers[i] = thread(producer, Items / pairs);
}
// Wait consumers finish the job
for (int i = 0; i < pairs; i++) {
producers[i].join();
consumers[i].join();
}
printf("\nChecksum: %#llx (it must be zero)\n", Total.load());
return 0;
}
// Part of uniQ library released under GNU 3.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment