Skip to content

Instantly share code, notes, and snippets.

@pdib
Last active May 11, 2017 09:49
Show Gist options
  • Save pdib/96e008f42fb5c614b4e9d87c4813be5d to your computer and use it in GitHub Desktop.
Save pdib/96e008f42fb5c614b4e9d87c4813be5d to your computer and use it in GitHub Desktop.
#define _ENABLE_ATOMIC_ALIGNMENT_FIX 1
#include <iostream>
#include <thread>
#include <array>
#include <atomic>
#include <memory>
#include <Windows.h>
#include <variant>
#include <vector>
using namespace std;
constexpr size_t SLOTS = 200;
class Semaphore
{
public:
Semaphore()
{
hSemaphore = CreateSemaphore(
nullptr,
0,
1,
nullptr);
}
~Semaphore()
{
CloseHandle(hSemaphore);
}
Semaphore(Semaphore const&) = delete;
Semaphore& operator=(Semaphore const&) = delete;
Semaphore(Semaphore&& old)
{
std::swap(hSemaphore, old.hSemaphore);
}
Semaphore& operator=(Semaphore&& rhs)
{
std::swap(this->hSemaphore, rhs.hSemaphore);
}
void Wait()
{
WaitForSingleObject(hSemaphore, 0L);
}
void Signal()
{
ReleaseSemaphore(hSemaphore, 1L, nullptr);
}
private:
HANDLE hSemaphore;
};
struct Done {};
struct NoTask {};
struct Workload { int value; };
using Task = variant<NoTask, Done, Workload>;
array<atomic<Task>, SLOTS> slots;
array<Semaphore, SLOTS> sems;
int findNextFreeSlot(int startIdx)
{
auto current = startIdx;
while (!holds_alternative<NoTask>(slots[current].load()))
{
current = (current + 1) % SLOTS;
}
return current;
}
void producerTask()
{
int limit = 30'000;
int current = 0;
// Fill slots with workloads.
for (int i = 0; i < limit; i++)
{
current = findNextFreeSlot(current);
slots[current].store(Workload{ i });
sems[current].Signal();
}
// We're done. Fill all slots with Done
int notified = 0;
current = 0;
while (notified < SLOTS)
{
current = findNextFreeSlot(current);
slots[current].store(Done{});
sems[current].Signal();
notified++;
}
}
void doWork(Workload w)
{
cout << w.value << "\n";
}
// Visits a task.
// Does work and clears the slot if needed.
// Returns true if we should continue polling the slots. False if there is no more work to do.
struct ConsumerTaskVisitor
{
ConsumerTaskVisitor(int mySlot) : mySlot(mySlot) {}
bool operator()(NoTask)
{
sems[mySlot].Wait();
return true;
}
bool operator()(Done) { return false; }
bool operator()(Workload w)
{
// Depending on how fast the producer is compared to the consumers, we might want to reorder the next two lines.
// - If the producer is slow, there usually are free slots. In this case, it's probably better *not to free it* before doing the work. That is: (A) then (B).
// The producer will see the slot as being *not free* and will move on to the next free slot, where a consumer will be able to start the work immediately.
//
// - If the producer is fast, slots are usually full. In this case, freeing the slot immediately then starting the work (that is: (B) then (A)) might be better.
// It allows the producer to move on to generating the next workload faster.
doWork(w); // (A)
slots[mySlot].store(NoTask{}); // (B)
return true;
}
private:
int mySlot;
};
void consumerTask(int mySlot)
{
auto visitor = ConsumerTaskVisitor{ mySlot };
bool keepGoing = true;
while (keepGoing)
keepGoing = visit(visitor, slots[mySlot].load());
}
int main()
{
vector<thread> threads;
threads.push_back(thread{ producerTask });
for (int i = 0; i < SLOTS; i++)
{
threads.push_back(thread{ consumerTask, i });
}
for (auto&& t : threads)
{
t.join();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment