Skip to content

Instantly share code, notes, and snippets.

@carasuca
Created February 9, 2017 07:35
Show Gist options
  • Save carasuca/5efe9f037b9af6aa65402edb4697ccd9 to your computer and use it in GitHub Desktop.
Save carasuca/5efe9f037b9af6aa65402edb4697ccd9 to your computer and use it in GitHub Desktop.
FIFO versions for review. Find the bugs.
#include <vector>
#include <future>
#include <atomic>
static const size_t
buffer_size = 1024*1024,
backlog_bits = 4,
backlog = 2<<backlog_bits; // backlog powers of 2 support overflow
class FIFO
{
std::atomic_size_t read_at, write_at;
std::vector<int> buffers[backlog];
public:
FIFO()
{
for (auto& buffer : buffers)
buffer.reserve(buffer_size);
}
size_t available() const
{
return write_at - read_at;
}
bool produce(int val) // assume only single thread can write
{
if (available() >= backlog)
return false;
buffers[write_at++ % backlog].assign(buffer_size, val);
return true;
}
int* consume()
{
if (!available()) return nullptr;
return buffers[read_at++ % backlog].data();
}
};
#include <list>
#include <mutex>
class FIFO2
{
std::mutex mtx;
std::condition_variable wake_up;
std::list<std::vector<int>> produced, consumed;
std::vector<int> current;
public:
FIFO2() : consumed(backlog)
{
for (auto& c : consumed)
c.reserve(buffer_size);
current.reserve(buffer_size);
}
bool produce(int val, bool enlarge = true)
{
std::unique_lock<std::mutex> lock_(mtx);
wake_up.notify_all();
if (consumed.empty())
{
if (enlarge)
produced.emplace_back(buffer_size, val);
return false;
}
lock_.unlock();
consumed.front().assign(buffer_size, val);
lock_.lock();
produced.splice(produced.cend(), consumed, consumed.cbegin());
return true;
}
int* consume(int ms = 10)
{
std::unique_lock<std::mutex> lock_(mtx);
//if (wake_up.wait_for(lock_, std::chrono::milliseconds(ms)) == std::cv_status::timeout)
// return nullptr;
if (produced.empty()) return 0; // spurious wake-up
consumed.splice(consumed.cend(), produced, produced.cbegin());
current.swap(consumed.back());
return current.data();
}
};
class FIFO3
{
std::atomic_size_t produced_count;
std::list<std::vector<int>> produced, consumed;
std::vector<int> current;
public:
FIFO3() : consumed(backlog)
{
for (auto& c : consumed)
c.reserve(buffer_size);
current.reserve(buffer_size);
}
bool produce(int val)
{
if (produced_count == backlog)
return false;
consumed.front().assign(buffer_size, val);
produced.splice(produced.cend(), consumed, consumed.cbegin());
++produced_count;
return true;
}
int* consume()
{
if (!produced_count)
return nullptr;
consumed.splice(consumed.cend(), produced, produced.cbegin());
current.swap(consumed.back());
--produced_count;
return current.data();
}
};
void wait(int ms = 10)
{
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
}
template<typename fifo_t>
void Test(fifo_t& fifo = fifo_t())
{
auto producer = std::async([&fifo]{
static int val;
while (1)
if (fifo.produce(val++))
{
//wait();
if (!(val % 100))
printf("produced: %i\n", val);
}
//else puts("Overproduction!");
});
auto consumer = std::async([&fifo]{
static int bored;
while (1)
if (auto ptr = fifo.consume())
{
//wait();
if (!(*ptr % 100))
printf("consumed: %i\n", *ptr);
}
//else printf("Consumer bored\t\t%i\r", bored++);
});
producer.get();
consumer.get();
}
void main()
{
Test<FIFO3>();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment