Last active
April 14, 2019 21:48
-
-
Save mrange/6827470fc6408d24e966cf9cf36d4e9b to your computer and use it in GitHub Desktop.
message processor C++
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <cassert> | |
#include <condition_variable> | |
#include <deque> | |
#include <iostream> | |
#include <functional> | |
#include <memory> | |
#include <mutex> | |
#include <string> | |
#include <thread> | |
#include <utility> | |
namespace | |
{ | |
template<typename TMessage> | |
struct mailbox_processor | |
{ | |
using message_t = TMessage ; | |
using enqueuer_t = std::function<void (message_t &&)> ; | |
using processor_t = std::function<bool (enqueuer_t const &, message_t &&)>; | |
mailbox_processor () = delete ; | |
mailbox_processor (mailbox_processor const &) = delete ; | |
mailbox_processor &operator= (mailbox_processor const &) = delete ; | |
mailbox_processor (mailbox_processor && ) = default ; | |
mailbox_processor& operator= (mailbox_processor && ) = default ; | |
// poison_pill is used by the destructor to kill the processor | |
// the processor must return false when it receives the poison_pill | |
mailbox_processor (message_t poison_pill, processor_t processor) | |
: poison_pill (std::move (poison_pill)) | |
, processor (std::move(processor) ) | |
, worker ([this] { process (); } ) | |
{ | |
} | |
~mailbox_processor () noexcept | |
{ | |
log () | |
<< "Going down..." | |
<< std::endl | |
; | |
enqueue (poison_pill); | |
worker.join (); | |
log () | |
<< "Done" | |
<< std::endl | |
; | |
} | |
void enqueue (message_t const & msg) | |
{ | |
message_t copy = msg; | |
enqueue (std::move (copy)); | |
} | |
void enqueue (message_t && msg) | |
{ | |
log () | |
<< "Enqueuing message" | |
<< std::endl | |
; | |
{ | |
std::lock_guard<std::mutex> lock (mtx); | |
messages.push_back (std::move (msg)); | |
} | |
cv.notify_all (); | |
} | |
private: | |
message_t pop () | |
{ | |
std::unique_lock<std::mutex> lock(mtx); | |
cv.wait (lock, [this] { return !messages.empty (); }); | |
// Predicate above ensures not empty | |
assert(!messages.empty ()); | |
auto message = std::move (messages.front ()); | |
messages.pop_front (); | |
return message; | |
} | |
std::ostream & log () const | |
{ | |
return | |
std::cout | |
<< "Message processor: " | |
<< std::hex | |
<< worker.get_id () | |
<< " - " | |
; | |
} | |
void process () | |
{ | |
try | |
{ | |
enqueuer_t enqueuer = [this] (message_t && msg) { enqueue (std::move (msg)); }; | |
log () | |
<< "Process loop started" | |
<< std::endl | |
; | |
bool cont = true; | |
while (cont) | |
{ | |
auto message = pop (); | |
log () | |
<< "Message received" | |
<< std::endl | |
; | |
cont &= this->processor (enqueuer, std::move (message)); | |
} | |
log () | |
<< "Process loop terminated" | |
<< std::endl | |
; | |
} | |
catch (std::exception const & e) | |
{ | |
log () | |
<< "Something bad happened, message processor died: " | |
<< e.what () | |
<< std::endl | |
; | |
} | |
catch (...) | |
{ | |
log () | |
<< "Something bad happened, message processor died" | |
<< std::endl | |
; | |
} | |
} | |
message_t poison_pill ; | |
processor_t processor ; | |
std::mutex mtx ; | |
std::condition_variable cv ; | |
std::deque<message_t> messages ; | |
std::thread worker ; | |
}; | |
struct my_message | |
{ | |
int msg_no; | |
}; | |
my_message my_poison_pill { -1 }; | |
bool my_processor (std::function<void (my_message &&)> enqueuer, my_message && msg) | |
{ | |
std::cout << "Received message: " << msg.msg_no << std::endl; | |
if (msg.msg_no == my_poison_pill.msg_no) | |
{ | |
std::cout << "Poison pill received, return false" << std::endl; | |
return false; | |
} | |
else if (msg.msg_no == 1) | |
{ | |
std::cout << "Hello!" << std::endl; | |
return true; | |
} | |
else if (msg.msg_no == 2) | |
{ | |
std::cout << "There!" << std::endl; | |
return true; | |
} | |
else | |
{ | |
std::cout << "Oops!" << std::endl; | |
return true; | |
} | |
} | |
} | |
int main() | |
{ | |
mailbox_processor<my_message> mbp (my_poison_pill, &my_processor); | |
mbp.enqueue (my_message {1}); | |
mbp.enqueue (my_message {2}); | |
mbp.enqueue (my_message {3}); | |
std::cout << "Press new-line to exit" << std::endl; | |
std::string line; | |
std::getline(std::cin, line); | |
return 0; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment