Last active
August 4, 2018 13:51
-
-
Save lrodorigo/c4ff16490bc553a48f4add56d0ede78b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef MIND_DDS_PROCESSINGQUEUE_H | |
#define MIND_DDS_PROCESSINGQUEUE_H | |
#include <condition_variable> | |
#include <vector> | |
#include <functional> | |
#include <thread> | |
#include <iostream> | |
#include <third_party/easylogging/easylogging++.h> | |
#include <queue> | |
#include <MindDDSUtils.h> | |
template<class T> | |
class TimedoutQueue { | |
protected: | |
struct Element { | |
T data{}; | |
double timeout{}; | |
std::string id{}; | |
bool operator<(const Element &rhs) { | |
return timeout > rhs.timeout; | |
} | |
}; | |
std::thread loopThread; | |
std::vector<Element> queue{}; | |
std::condition_variable cond{}; | |
std::mutex lock{}; | |
std::function<void(const T&)> handleDataCallback{}; | |
bool threadStopped{false}; | |
void removeItem(const std::string& id) { | |
for (auto it = queue.begin(); it != queue.end();) { | |
if (id == it->id) { | |
it = queue.erase(it); | |
} else | |
++it; | |
} | |
std::make_heap(queue.begin(), queue.end()); // riordino | |
} | |
void pushInvalidate(const Element &toPush) { | |
// mi assicuro che sia l'unica chiave in coda con quell'id, e che sia la prima da gestire | |
removeItem(toPush.id); | |
queue.push_back(toPush); | |
std::push_heap(queue.begin(), queue.end()); | |
} | |
inline int getNextTimewait() { | |
if (queue.empty()) | |
return -1; | |
return 1000*( queue.front().timeout - MindDDSUtils::now()); | |
} | |
int handleFrontElement() { | |
std::pop_heap(queue.begin(), queue.end()); // moves the largest to the end | |
auto& element = queue.back(); | |
// LOG(INFO)<<"Gestisco elemento id: "<<element.id; | |
try { | |
this->handleDataCallback(element.data); | |
} catch (std::exception& ex) { | |
LOG(ERROR)<<"Exception in callback:"<<ex.what(); | |
} | |
queue.pop_back(); | |
return getNextTimewait(); | |
} | |
void push(const Element &item) { | |
std::lock_guard<std::mutex> l(lock); | |
pushInvalidate(item); | |
cond.notify_one(); | |
} | |
public: | |
explicit TimedoutQueue(std::function<void(const T&)> handleDataCallback) | |
: handleDataCallback(handleDataCallback), | |
loopThread(std::bind(&TimedoutQueue<T>::threadLoop, this)) | |
{ | |
std::make_heap(queue.begin(), queue.end()); | |
} | |
virtual ~TimedoutQueue() { | |
{ | |
std::lock_guard<std::mutex> l(lock); | |
this->threadStopped = true; | |
cond.notify_one(); | |
} | |
this->loopThread.join(); | |
} | |
void push(const T& data, const std::string &id, int expirationMillis) { | |
Element new_elem; | |
new_elem.timeout = MindDDSUtils::now() + ((double)expirationMillis)/1000.0d; | |
new_elem.data = data; | |
new_elem.id = id; | |
this->push(new_elem); | |
} | |
void deleteItem(const std::string& id) { | |
std::lock_guard<std::mutex> l(lock); | |
removeItem(id); | |
cond.notify_one(); | |
} | |
void printQueue() { | |
std::cout<<"[ "; | |
for (auto& e: queue){ | |
std::cout<<e.id<< " "; | |
} | |
std::cout<<" ]"<<std::endl; | |
} | |
void threadLoop() { | |
int wait_ms = -1; | |
while (!threadStopped) { | |
std::unique_lock<std::mutex> lk(lock); // il mutex è preso | |
// printQueue(); | |
if (wait_ms < 0) { // coda vuota, attendo il riempimento | |
// LOG(INFO) << "Coda vuota!"; | |
cond.wait(lk); | |
if (queue.empty()) { | |
wait_ms = -1; | |
continue; | |
} | |
if (threadStopped) | |
return; | |
// coda diventa piena | |
wait_ms = getNextTimewait(); | |
// LOG(INFO) << "Coda riempita! Prossima attesa: " << wait_ms; | |
if (wait_ms > 0) | |
continue; | |
} else { | |
// LOG(INFO)<<"Coda già piena, prossima attesa: "<<wait_ms; | |
if (cond.wait_for(lk, std::chrono::milliseconds(wait_ms))==std::cv_status::no_timeout) { | |
// mi sono sbloccato perché è arrivato un nuovo dato o perché ho tolto un elemento | |
if (queue.empty()) { | |
wait_ms = -1; // ultimo elemento cancellato | |
continue; | |
} | |
if (threadStopped) | |
return; | |
// non vuota | |
wait_ms = getNextTimewait(); | |
// LOG(INFO) << "Altro elemento aggiunto alla coda! Nuova attesa Calcolata: " << wait_ms; | |
if (wait_ms > 0) | |
continue; | |
} | |
wait_ms = handleFrontElement(); | |
} | |
} | |
} | |
}; | |
#endif //MIND_DDS_PROCESSINGQUEUE_H |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment