Skip to content

Instantly share code, notes, and snippets.

@lrodorigo
Last active August 4, 2018 13:51
Show Gist options
  • Save lrodorigo/c4ff16490bc553a48f4add56d0ede78b to your computer and use it in GitHub Desktop.
Save lrodorigo/c4ff16490bc553a48f4add56d0ede78b to your computer and use it in GitHub Desktop.
#ifndef MIND_DDS_PROCESSINGQUEUE_H
#define MIND_DDS_PROCESSINGQUEUE_H
#include <condition_variable>
#include <vector>
#include <functional>
#include <thread>
#include <iostream>
#include <third_party/easylogging/easylogging++.h>
#include <queue>
#include <MindDDSUtils.h>
template<class T>
class TimedoutQueue {
protected:
struct Element {
T data{};
double timeout{};
std::string id{};
bool operator<(const Element &rhs) {
return timeout > rhs.timeout;
}
};
std::thread loopThread;
std::vector<Element> queue{};
std::condition_variable cond{};
std::mutex lock{};
std::function<void(const T&)> handleDataCallback{};
bool threadStopped{false};
void removeItem(const std::string& id) {
for (auto it = queue.begin(); it != queue.end();) {
if (id == it->id) {
it = queue.erase(it);
} else
++it;
}
std::make_heap(queue.begin(), queue.end()); // riordino
}
void pushInvalidate(const Element &toPush) {
// mi assicuro che sia l'unica chiave in coda con quell'id, e che sia la prima da gestire
removeItem(toPush.id);
queue.push_back(toPush);
std::push_heap(queue.begin(), queue.end());
}
inline int getNextTimewait() {
if (queue.empty())
return -1;
return 1000*( queue.front().timeout - MindDDSUtils::now());
}
int handleFrontElement() {
std::pop_heap(queue.begin(), queue.end()); // moves the largest to the end
auto& element = queue.back();
// LOG(INFO)<<"Gestisco elemento id: "<<element.id;
try {
this->handleDataCallback(element.data);
} catch (std::exception& ex) {
LOG(ERROR)<<"Exception in callback:"<<ex.what();
}
queue.pop_back();
return getNextTimewait();
}
void push(const Element &item) {
std::lock_guard<std::mutex> l(lock);
pushInvalidate(item);
cond.notify_one();
}
public:
explicit TimedoutQueue(std::function<void(const T&)> handleDataCallback)
: handleDataCallback(handleDataCallback),
loopThread(std::bind(&TimedoutQueue<T>::threadLoop, this))
{
std::make_heap(queue.begin(), queue.end());
}
virtual ~TimedoutQueue() {
{
std::lock_guard<std::mutex> l(lock);
this->threadStopped = true;
cond.notify_one();
}
this->loopThread.join();
}
void push(const T& data, const std::string &id, int expirationMillis) {
Element new_elem;
new_elem.timeout = MindDDSUtils::now() + ((double)expirationMillis)/1000.0d;
new_elem.data = data;
new_elem.id = id;
this->push(new_elem);
}
void deleteItem(const std::string& id) {
std::lock_guard<std::mutex> l(lock);
removeItem(id);
cond.notify_one();
}
void printQueue() {
std::cout<<"[ ";
for (auto& e: queue){
std::cout<<e.id<< " ";
}
std::cout<<" ]"<<std::endl;
}
void threadLoop() {
int wait_ms = -1;
while (!threadStopped) {
std::unique_lock<std::mutex> lk(lock); // il mutex è preso
// printQueue();
if (wait_ms < 0) { // coda vuota, attendo il riempimento
// LOG(INFO) << "Coda vuota!";
cond.wait(lk);
if (queue.empty()) {
wait_ms = -1;
continue;
}
if (threadStopped)
return;
// coda diventa piena
wait_ms = getNextTimewait();
// LOG(INFO) << "Coda riempita! Prossima attesa: " << wait_ms;
if (wait_ms > 0)
continue;
} else {
// LOG(INFO)<<"Coda già piena, prossima attesa: "<<wait_ms;
if (cond.wait_for(lk, std::chrono::milliseconds(wait_ms))==std::cv_status::no_timeout) {
// mi sono sbloccato perché è arrivato un nuovo dato o perché ho tolto un elemento
if (queue.empty()) {
wait_ms = -1; // ultimo elemento cancellato
continue;
}
if (threadStopped)
return;
// non vuota
wait_ms = getNextTimewait();
// LOG(INFO) << "Altro elemento aggiunto alla coda! Nuova attesa Calcolata: " << wait_ms;
if (wait_ms > 0)
continue;
}
wait_ms = handleFrontElement();
}
}
}
};
#endif //MIND_DDS_PROCESSINGQUEUE_H
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment