Last active
April 5, 2022 12:18
-
-
Save jweinst1/2eea5f1d2b29133493790e398d400875 to your computer and use it in GitHub Desktop.
experimental time caching db
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <thread> | |
#include <chrono> | |
#include <atomic> | |
#include <cstdio> | |
#include <assert.h> | |
#include <condition_variable> | |
#include <mutex> | |
#include <sys/mman.h> | |
#include <new> | |
static const auto EPOCH = std::chrono::steady_clock::now(); | |
static inline unsigned long long | |
current_mono_time() | |
{ | |
auto point = std::chrono::steady_clock::now(); | |
return std::chrono::duration_cast<std::chrono::microseconds>(point - EPOCH).count(); | |
} | |
static size_t thread_count = 4; | |
static std::atomic<unsigned long long>* thread_time_instance() { | |
static std::atomic<unsigned long long>* times = []{ return new std::atomic<unsigned long long>[thread_count]; }(); | |
return times; | |
} | |
static void updateTimeForThread(size_t tid, unsigned long long newThreadTime) { | |
// only the owning thread can write to it's own time. | |
thread_time_instance()[tid].store(newThreadTime); | |
} | |
static bool isTimeExpiredforThreads(unsigned long long timePoint) { | |
std::atomic<unsigned long long>* times = thread_time_instance(); | |
for (size_t i = 0; i < thread_count; ++i) | |
{ | |
if (times[i].load() <= timePoint) { | |
return false; | |
} | |
} | |
return true; | |
} | |
static inline bool isTimeExpiredforThread(size_t tid, unsigned long long timePoint) { | |
return thread_time_instance()[tid] > timePoint; | |
} | |
template<class T> | |
struct SpScNode { | |
std::atomic<T*> ptr; | |
SpScNode* next; | |
SpScNode(): ptr(nullptr),next(nullptr) {} | |
static SpScNode<T>* make_ring(size_t amount) { | |
SpScNode<T>* tail = new SpScNode(); | |
SpScNode<T>* head = new SpScNode(); | |
head->next = tail; | |
for (size_t i = 0; i < (amount - 2); ++i) | |
{ | |
SpScNode<T>* link = new SpScNode(); | |
link->next = head; | |
head = link; | |
} | |
tail->next = head; | |
return head; | |
} | |
}; | |
template<class T> | |
class SpSc { | |
public: | |
SpSc(size_t size) { | |
SpScNode<T>* ring = SpScNode<T>::make_ring(size); | |
_head.store(ring); | |
_tail.store(ring); | |
} | |
inline bool isFull() { | |
SpScNode<T>* tail = _tail.load(); | |
SpScNode<T>* head = _head.load(); | |
return head == tail && tail->ptr.load() != nullptr; | |
} | |
inline bool isEmpty() { | |
SpScNode<T>* tail = _tail.load(); | |
SpScNode<T>* head = _head.load(); | |
return head == tail && tail->ptr.load() == nullptr; | |
} | |
bool push(T* obj) { | |
SpScNode<T>* tail = _tail.load(); | |
SpScNode<T>* head = _head.load(); | |
if (head == tail) { | |
T* dummy = nullptr; | |
if (tail->ptr.compare_exchange_strong(dummy, obj)) { | |
_tail.store(tail->next); | |
return true; | |
} else { | |
return false; | |
} | |
} else { | |
tail->ptr.store(obj); | |
_tail.store(tail->next); | |
return true; | |
} | |
} | |
T* pop() { | |
SpScNode<T>* head = _head.load(); | |
T* swappedOut = head->ptr.exchange(nullptr); | |
if (swappedOut != nullptr) { | |
// advance only if successful | |
_head.store(head->next); | |
} | |
return swappedOut; | |
} | |
private: | |
std::atomic<SpScNode<T>*> _head; | |
std::atomic<SpScNode<T>*> _tail; | |
}; | |
template<class T> | |
class ExecNode { | |
public: | |
ExecNode(size_t queueSize, const std::function<void(T*)>& handler): | |
_active(false), _shutdown(false), _handler(handler), _queue(queueSize) | |
{} | |
~ExecNode() { | |
// Must be joined before this object gets destroyed | |
assert(!_active.load()); | |
} | |
void start() { | |
assert(!_active.load()); | |
_thread = std::thread([this]{ | |
while (!this->_shutdown.load()) { | |
std::unique_lock<std::mutex> hold(this->_mtx); | |
this->_cond.wait(hold, [this]{ return !this->_queue.isEmpty() || this->_shutdown.load();}); | |
T* popped = this->_queue.pop(); | |
while (popped != nullptr) { | |
this->_handler(popped); | |
popped = this->_queue.pop(); | |
} | |
} | |
}); | |
_active.store(true); | |
} | |
bool give(T* job) { | |
bool res = _queue.push(job); | |
_cond.notify_one(); | |
return res; | |
} | |
void stop() { | |
_shutdown.store(true); | |
_cond.notify_all(); // only one thread but might as well call this | |
_thread.join(); | |
_active.store(false); | |
} | |
private: | |
std::atomic<bool> _active; | |
std::atomic<bool> _shutdown; | |
std::function<void(T*)> _handler; | |
SpSc<T> _queue; | |
std::thread _thread; | |
std::mutex _mtx; | |
std::condition_variable _cond; | |
}; | |
class MappedMemory { | |
public: | |
explicit MappedMemory(size_t mem_size): _unmapped(false), _mem_size(mem_size) { | |
_mem_block = static_cast<char*>(::mmap( NULL, _mem_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0)); | |
} | |
inline void* memory() const { | |
return _mem_block; | |
} | |
inline size_t size() const { | |
return _mem_size; | |
} | |
void doUnMap() { | |
bool dummy = false; | |
if (_unmapped.compare_exchange_strong(dummy, true)) { | |
::munmap(_mem_block, _mem_size); | |
} | |
} | |
~MappedMemory() { | |
doUnMap(); | |
} | |
private: | |
std::atomic<bool> _unmapped; | |
size_t _mem_size; | |
char* _mem_block; | |
}; | |
//todo should be aligned ? | |
struct MemorySlot { | |
std::atomic<unsigned long long> mem_time = 0; | |
std::atomic<MemorySlot*> next_ptr; | |
std::atomic<size_t> value; | |
bool claim(unsigned long long newTime) { | |
if (isTimeExpiredforThreads(newTime)) { | |
unsigned long long currentTime = mem_time.load(); | |
return mem_time.compare_exchange_strong(currentTime, newTime); | |
} else { | |
return false; | |
} | |
} | |
}; | |
class MemorySlotBank { | |
public: | |
explicit MemorySlotBank(const MappedMemory& map): _slots(reinterpret_cast<MemorySlot*>(map.memory())), | |
_slot_count(map.size()/sizeof(MemorySlot)), | |
_map(map) {} | |
size_t count() const { return _slot_count; } | |
MemorySlot* at(size_t index) { | |
if (index >= _slot_count) | |
return nullptr; | |
else | |
return &_slots[index]; | |
} | |
MemorySlot* findBruteForce(unsigned long long claimTime) { | |
for (size_t i = 0; i < _slot_count; ++i) | |
{ | |
MemorySlot* slot = &_slots[i]; | |
if (slot->claim(claimTime)) { | |
return slot; | |
} | |
} | |
// Todo better allocating | |
return nullptr; | |
} | |
MemorySlot* findBruteList(unsigned long long claimTime, size_t* amount) { | |
MemorySlot* baseList = nullptr; | |
MemorySlot* stringer = nullptr; | |
for (size_t i = 0; i < _slot_count; ++i) | |
{ | |
if (*amount == 0) | |
break; | |
MemorySlot* slot = &_slots[i]; | |
if (slot->claim(claimTime)) { | |
if (baseList == nullptr) { | |
baseList = slot; | |
stringer = baseList; | |
} else { | |
stringer->next_ptr.store(slot); | |
stringer = slot; | |
} | |
*amount -= 1; | |
} | |
} | |
if (stringer != nullptr) { | |
// Don't leave false pointers at the end | |
stringer->next_ptr.store(nullptr); | |
} | |
return baseList; | |
} | |
private: | |
MemorySlot* _slots; | |
size_t _slot_count; | |
const MappedMemory& _map; | |
}; | |
static void spscTest() { | |
SpSc<int> q(10); | |
int nums [10]; | |
for (size_t i = 0; i < 10; ++i) | |
{ | |
assert(q.push(nums + i)); | |
} | |
for (size_t j = 0; j < 10; ++j) | |
{ | |
assert(q.pop() == (nums + j)); | |
} | |
} | |
static void execnodeTest() { | |
auto fn = [](int* ptr) { *ptr += 4; }; | |
ExecNode exn(5, std::function<void(int*)>(fn)); | |
exn.start(); | |
int foo = 5; | |
assert(exn.give(&foo)); | |
while (foo != 9) { | |
std::this_thread::sleep_for(std::chrono::seconds(1)); | |
} | |
exn.stop(); | |
} | |
int main(int argc, char const *argv[]) | |
{ | |
MappedMemory map(1024 * 1024 * 1024); | |
spscTest(); | |
execnodeTest(); | |
//std::printf("total slots %zu\n", map.slotsSize()); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment