Skip to content

Instantly share code, notes, and snippets.

@jeb2239
Created April 10, 2021 22:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeb2239/13919bcebec6e3c617926f5677a7b2ec to your computer and use it in GitHub Desktop.
Save jeb2239/13919bcebec6e3c617926f5677a7b2ec to your computer and use it in GitHub Desktop.
fgl.
#include <sys/time.h>
//#include "queues.h"
#include <iostream>
#include <thread>
#include <atomic>
#include <deque>
#include <mutex>
#include <optional>
#include <vector>
#include <iostream>
// Defines the interface for the Fixed Size Queue
template <typename T>
class FixedSizeQueueInterface
{
public:
virtual ~FixedSizeQueueInterface() = default;
virtual bool Read(T *data) = 0; // waitfree
virtual bool Write(const T &data) = 0; // waitfree
virtual bool isEmpty() = 0;
};
// Implements a fixed-size queue using a Mutex
/* Update this to use reader writer locks */
// atomic occupied, this was the element
template <typename T>
class MutexFixedSizeQueue : public FixedSizeQueueInterface<T>
{
public:
// Simple helper class to ensure a lock is unlocked once the scope is exited
// class ScopedLock {
// public:
// ScopedLock(std::mutex* mutex) : mutex_(mutex) {
// mutex_->lock();
// }
// ~ScopedLock() {
// mutex_->unlock();
// }
// private:
// std::mutex* mutex_;
// };
explicit MutexFixedSizeQueue(int max_size) : max_size(max_size) {
for(Entry& entry:table){
entry.q.resize(max_size);
}
}
// Reads the next data item into 'data', returns true
// if successful or false if the queue was empty.
bool Read(T *data)
{
auto const idx = get_consumer_id() % table_count;
std::unique_lock<std::mutex> lock(table[idx].mut, std::try_to_lock);
// like lock gaurd but only one thread , checking if try_lock actually worked
if (lock.owns_lock())
{
//if (table[idx].mut.try_lock()){
auto &entry = table[idx];
if (entry.q[entry.head].has_value())
{
data = &entry.q[entry.head].value();
entry.q[entry.head].reset(); //data has been used / popped
entry.head = (entry.head + 1) % entry.q.size();
//table[idx].mut.unlock();
return true;
}
}
return false;
}
// Writes 'data' into the queue. Returns true if successful
// or false if the queue was full.
bool Write(const T &data)
{
auto const idx = get_consumer_id() % table_count;
// ScopedLock lock(&mutex_);
std::unique_lock<std::mutex> lock(table[idx].mut, std::try_to_lock);
// if (!buffer[tail].Data.has_value()) // if tail doesn't have value lets write to it
if (lock.owns_lock())
{
auto &entry = table[idx];
if (!entry.q[entry.end].has_value())
{
// if this spot is free
entry.q[entry.end] = data;
entry.end = (entry.end + 1) % entry.q.size();
return true;
}
}
return false;
}
// might be empt
bool isEmpty(){ // state acoss whole thing
std::vector<std::unique_lock<std::mutex>> locks;
locks.reserve(table_count);
for(auto& entry: table){
locks.emplace_back(entry.mut);
}
for(auto& entry : table){
if (entry.head!=entry.end) {
return false;
}
}
return true;
}
private:
thread_local static uint64_t consumer_id;
std::mutex consumer_id_mutex;
uint64_t next_consumer_id = 1;
uint64_t get_consumer_id()
{
if (!consumer_id)
{
std::lock_guard guard(consumer_id_mutex);
consumer_id = next_consumer_id;
++next_consumer_id;
}
return consumer_id;
}
// single producer single consumer non atomic index
struct Entry
{
std::mutex mut;
// std::optional<T> Data; // if you are not default constructable
std::vector<std::optional<T>> q; // holds that section of data
uint64_t head=0;
uint64_t end=0;
};
std::vector<Entry> table;
int table_count=4;
int max_size;
};
template<typename T>
thread_local uint64_t MutexFixedSizeQueue<T>::consumer_id;
struct Data {
Data(int x, int y) : a(x), b(y) {}
Data() : a(-1), b(-1) {}
int a;
int b;
};
int main(int argc, char *argv[]) {
MutexFixedSizeQueue<Data> mq(4);
mq.Write(Data(3, 4));
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment