Skip to content

Instantly share code, notes, and snippets.

@jweinst1
Last active April 5, 2022 12:18
Show Gist options
  • Save jweinst1/2eea5f1d2b29133493790e398d400875 to your computer and use it in GitHub Desktop.
Save jweinst1/2eea5f1d2b29133493790e398d400875 to your computer and use it in GitHub Desktop.
experimental time caching db
#include <thread>
#include <chrono>
#include <atomic>
#include <cstdio>
#include <assert.h>
#include <condition_variable>
#include <mutex>
#include <sys/mman.h>
#include <new>
static const auto EPOCH = std::chrono::steady_clock::now();
static inline unsigned long long
current_mono_time()
{
auto point = std::chrono::steady_clock::now();
return std::chrono::duration_cast<std::chrono::microseconds>(point - EPOCH).count();
}
static size_t thread_count = 4;
static std::atomic<unsigned long long>* thread_time_instance() {
static std::atomic<unsigned long long>* times = []{ return new std::atomic<unsigned long long>[thread_count]; }();
return times;
}
static void updateTimeForThread(size_t tid, unsigned long long newThreadTime) {
// only the owning thread can write to it's own time.
thread_time_instance()[tid].store(newThreadTime);
}
static bool isTimeExpiredforThreads(unsigned long long timePoint) {
std::atomic<unsigned long long>* times = thread_time_instance();
for (size_t i = 0; i < thread_count; ++i)
{
if (times[i].load() <= timePoint) {
return false;
}
}
return true;
}
static inline bool isTimeExpiredforThread(size_t tid, unsigned long long timePoint) {
return thread_time_instance()[tid] > timePoint;
}
template<class T>
struct SpScNode {
std::atomic<T*> ptr;
SpScNode* next;
SpScNode(): ptr(nullptr),next(nullptr) {}
static SpScNode<T>* make_ring(size_t amount) {
SpScNode<T>* tail = new SpScNode();
SpScNode<T>* head = new SpScNode();
head->next = tail;
for (size_t i = 0; i < (amount - 2); ++i)
{
SpScNode<T>* link = new SpScNode();
link->next = head;
head = link;
}
tail->next = head;
return head;
}
};
template<class T>
class SpSc {
public:
SpSc(size_t size) {
SpScNode<T>* ring = SpScNode<T>::make_ring(size);
_head.store(ring);
_tail.store(ring);
}
inline bool isFull() {
SpScNode<T>* tail = _tail.load();
SpScNode<T>* head = _head.load();
return head == tail && tail->ptr.load() != nullptr;
}
inline bool isEmpty() {
SpScNode<T>* tail = _tail.load();
SpScNode<T>* head = _head.load();
return head == tail && tail->ptr.load() == nullptr;
}
bool push(T* obj) {
SpScNode<T>* tail = _tail.load();
SpScNode<T>* head = _head.load();
if (head == tail) {
T* dummy = nullptr;
if (tail->ptr.compare_exchange_strong(dummy, obj)) {
_tail.store(tail->next);
return true;
} else {
return false;
}
} else {
tail->ptr.store(obj);
_tail.store(tail->next);
return true;
}
}
T* pop() {
SpScNode<T>* head = _head.load();
T* swappedOut = head->ptr.exchange(nullptr);
if (swappedOut != nullptr) {
// advance only if successful
_head.store(head->next);
}
return swappedOut;
}
private:
std::atomic<SpScNode<T>*> _head;
std::atomic<SpScNode<T>*> _tail;
};
template<class T>
class ExecNode {
public:
ExecNode(size_t queueSize, const std::function<void(T*)>& handler):
_active(false), _shutdown(false), _handler(handler), _queue(queueSize)
{}
~ExecNode() {
// Must be joined before this object gets destroyed
assert(!_active.load());
}
void start() {
assert(!_active.load());
_thread = std::thread([this]{
while (!this->_shutdown.load()) {
std::unique_lock<std::mutex> hold(this->_mtx);
this->_cond.wait(hold, [this]{ return !this->_queue.isEmpty() || this->_shutdown.load();});
T* popped = this->_queue.pop();
while (popped != nullptr) {
this->_handler(popped);
popped = this->_queue.pop();
}
}
});
_active.store(true);
}
bool give(T* job) {
bool res = _queue.push(job);
_cond.notify_one();
return res;
}
void stop() {
_shutdown.store(true);
_cond.notify_all(); // only one thread but might as well call this
_thread.join();
_active.store(false);
}
private:
std::atomic<bool> _active;
std::atomic<bool> _shutdown;
std::function<void(T*)> _handler;
SpSc<T> _queue;
std::thread _thread;
std::mutex _mtx;
std::condition_variable _cond;
};
class MappedMemory {
public:
explicit MappedMemory(size_t mem_size): _unmapped(false), _mem_size(mem_size) {
_mem_block = static_cast<char*>(::mmap( NULL, _mem_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0));
}
inline void* memory() const {
return _mem_block;
}
inline size_t size() const {
return _mem_size;
}
void doUnMap() {
bool dummy = false;
if (_unmapped.compare_exchange_strong(dummy, true)) {
::munmap(_mem_block, _mem_size);
}
}
~MappedMemory() {
doUnMap();
}
private:
std::atomic<bool> _unmapped;
size_t _mem_size;
char* _mem_block;
};
//todo should be aligned ?
struct MemorySlot {
std::atomic<unsigned long long> mem_time = 0;
std::atomic<MemorySlot*> next_ptr;
std::atomic<size_t> value;
bool claim(unsigned long long newTime) {
if (isTimeExpiredforThreads(newTime)) {
unsigned long long currentTime = mem_time.load();
return mem_time.compare_exchange_strong(currentTime, newTime);
} else {
return false;
}
}
};
class MemorySlotBank {
public:
explicit MemorySlotBank(const MappedMemory& map): _slots(reinterpret_cast<MemorySlot*>(map.memory())),
_slot_count(map.size()/sizeof(MemorySlot)),
_map(map) {}
size_t count() const { return _slot_count; }
MemorySlot* at(size_t index) {
if (index >= _slot_count)
return nullptr;
else
return &_slots[index];
}
MemorySlot* findBruteForce(unsigned long long claimTime) {
for (size_t i = 0; i < _slot_count; ++i)
{
MemorySlot* slot = &_slots[i];
if (slot->claim(claimTime)) {
return slot;
}
}
// Todo better allocating
return nullptr;
}
MemorySlot* findBruteList(unsigned long long claimTime, size_t* amount) {
MemorySlot* baseList = nullptr;
MemorySlot* stringer = nullptr;
for (size_t i = 0; i < _slot_count; ++i)
{
if (*amount == 0)
break;
MemorySlot* slot = &_slots[i];
if (slot->claim(claimTime)) {
if (baseList == nullptr) {
baseList = slot;
stringer = baseList;
} else {
stringer->next_ptr.store(slot);
stringer = slot;
}
*amount -= 1;
}
}
if (stringer != nullptr) {
// Don't leave false pointers at the end
stringer->next_ptr.store(nullptr);
}
return baseList;
}
private:
MemorySlot* _slots;
size_t _slot_count;
const MappedMemory& _map;
};
static void spscTest() {
SpSc<int> q(10);
int nums [10];
for (size_t i = 0; i < 10; ++i)
{
assert(q.push(nums + i));
}
for (size_t j = 0; j < 10; ++j)
{
assert(q.pop() == (nums + j));
}
}
static void execnodeTest() {
auto fn = [](int* ptr) { *ptr += 4; };
ExecNode exn(5, std::function<void(int*)>(fn));
exn.start();
int foo = 5;
assert(exn.give(&foo));
while (foo != 9) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
exn.stop();
}
int main(int argc, char const *argv[])
{
MappedMemory map(1024 * 1024 * 1024);
spscTest();
execnodeTest();
//std::printf("total slots %zu\n", map.slotsSize());
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment