Skip to content

Instantly share code, notes, and snippets.

@mrange
Last active April 14, 2019 21:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrange/6827470fc6408d24e966cf9cf36d4e9b to your computer and use it in GitHub Desktop.
Save mrange/6827470fc6408d24e966cf9cf36d4e9b to your computer and use it in GitHub Desktop.
message processor C++
#include <cassert>
#include <condition_variable>
#include <deque>
#include <iostream>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <utility>
namespace
{
template<typename TMessage>
struct mailbox_processor
{
using message_t = TMessage ;
using enqueuer_t = std::function<void (message_t &&)> ;
using processor_t = std::function<bool (enqueuer_t const &, message_t &&)>;
mailbox_processor () = delete ;
mailbox_processor (mailbox_processor const &) = delete ;
mailbox_processor &operator= (mailbox_processor const &) = delete ;
mailbox_processor (mailbox_processor && ) = default ;
mailbox_processor& operator= (mailbox_processor && ) = default ;
// poison_pill is used by the destructor to kill the processor
// the processor must return false when it receives the poison_pill
mailbox_processor (message_t poison_pill, processor_t processor)
: poison_pill (std::move (poison_pill))
, processor (std::move(processor) )
, worker ([this] { process (); } )
{
}
~mailbox_processor () noexcept
{
log ()
<< "Going down..."
<< std::endl
;
enqueue (poison_pill);
worker.join ();
log ()
<< "Done"
<< std::endl
;
}
void enqueue (message_t const & msg)
{
message_t copy = msg;
enqueue (std::move (copy));
}
void enqueue (message_t && msg)
{
log ()
<< "Enqueuing message"
<< std::endl
;
{
std::lock_guard<std::mutex> lock (mtx);
messages.push_back (std::move (msg));
}
cv.notify_all ();
}
private:
message_t pop ()
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait (lock, [this] { return !messages.empty (); });
// Predicate above ensures not empty
assert(!messages.empty ());
auto message = std::move (messages.front ());
messages.pop_front ();
return message;
}
std::ostream & log () const
{
return
std::cout
<< "Message processor: "
<< std::hex
<< worker.get_id ()
<< " - "
;
}
void process ()
{
try
{
enqueuer_t enqueuer = [this] (message_t && msg) { enqueue (std::move (msg)); };
log ()
<< "Process loop started"
<< std::endl
;
bool cont = true;
while (cont)
{
auto message = pop ();
log ()
<< "Message received"
<< std::endl
;
cont &= this->processor (enqueuer, std::move (message));
}
log ()
<< "Process loop terminated"
<< std::endl
;
}
catch (std::exception const & e)
{
log ()
<< "Something bad happened, message processor died: "
<< e.what ()
<< std::endl
;
}
catch (...)
{
log ()
<< "Something bad happened, message processor died"
<< std::endl
;
}
}
message_t poison_pill ;
processor_t processor ;
std::mutex mtx ;
std::condition_variable cv ;
std::deque<message_t> messages ;
std::thread worker ;
};
struct my_message
{
int msg_no;
};
my_message my_poison_pill { -1 };
bool my_processor (std::function<void (my_message &&)> enqueuer, my_message && msg)
{
std::cout << "Received message: " << msg.msg_no << std::endl;
if (msg.msg_no == my_poison_pill.msg_no)
{
std::cout << "Poison pill received, return false" << std::endl;
return false;
}
else if (msg.msg_no == 1)
{
std::cout << "Hello!" << std::endl;
return true;
}
else if (msg.msg_no == 2)
{
std::cout << "There!" << std::endl;
return true;
}
else
{
std::cout << "Oops!" << std::endl;
return true;
}
}
}
int main()
{
mailbox_processor<my_message> mbp (my_poison_pill, &my_processor);
mbp.enqueue (my_message {1});
mbp.enqueue (my_message {2});
mbp.enqueue (my_message {3});
std::cout << "Press new-line to exit" << std::endl;
std::string line;
std::getline(std::cin, line);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment