Skip to content

Instantly share code, notes, and snippets.

@jhosteny
Created May 6, 2019 13:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhosteny/ce4f65bd2f950189711a66b42dc13390 to your computer and use it in GitHub Desktop.
Save jhosteny/ce4f65bd2f950189711a66b42dc13390 to your computer and use it in GitHub Desktop.
test.cpp
#include <queue>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <algorithm>
#include <random>
#include <functional>
#include <boost/circular_buffer.hpp>
#include <boost/program_options.hpp>
#include <iostream>
using namespace std;
class Item {
public:
enum Status
{
QUEUED,
SENT,
ACKED,
FAILED,
TIMEDOUT
};
private:
int m_SeqNum;
Status m_Status;
public:
int seqNo() { return m_SeqNum; }
Status getStatus() { return m_Status; }
void setStatus(Status status) { m_Status = status; }
Item(int seqNum) : m_SeqNum(seqNum), m_Status(QUEUED) {}
};
struct ShutdownException : public exception
{
};
template <typename T>
class Queue {
public:
mutex m_Mutex;
condition_variable m_ConditionVariable;
deque<T> m_Queue;
bool m_Shutdown;
mt19937 m_RandomNumberGenerator;
void push(T &t)
{
{
lock_guard<mutex> lk(m_Mutex);
if (m_Shutdown) {
return;
}
m_Queue.push_back(t);
}
m_ConditionVariable.notify_one();
}
void shuffle()
{
lock_guard<mutex> lk(m_Mutex);
std::shuffle(m_Queue.begin(), m_Queue.end(), m_RandomNumberGenerator);
}
T pop()
{
unique_lock<mutex> lk(m_Mutex);
m_ConditionVariable.wait(lk, [this]{ return (m_Queue.size() > 0) || m_Shutdown; });
if (m_Shutdown && (m_Queue.size() == 0)) {
throw ShutdownException();
}
T& t = m_Queue.front();
m_Queue.pop_front();
return t;
}
void shutdown()
{
{
lock_guard<mutex> lk(m_Mutex);
m_Shutdown = true;
}
m_ConditionVariable.notify_one();
}
Queue() : m_Shutdown(false), m_RandomNumberGenerator(random_device()()) {}
};
class MQTTDevice
{
public:
enum CallStatus {
SUCCESS,
TIMEOUT,
ERROR
};
static const char* statusName(CallStatus status)
{
static const char* statusNames[] =
{
"Success",
"Timeout",
"Error"
};
return statusNames[status];
}
void callback(void *context, CallStatus status);
};
class Client
{
private:
// Helper class
class CallContext {
private:
void* m_Context;
public:
CallContext(void *context) : m_Context(context) {}
void* context() { return m_Context; }
};
typedef function<void(void *, MQTTDevice::CallStatus)> CallbackFunc;
CallbackFunc m_CallbackFunc;
Queue<CallContext *> m_Ingress;
thread* m_Thread;
mt19937 m_RandomNumberGenerator;
discrete_distribution<int> m_StatusDistribution;
uniform_int_distribution<int> m_UniformPercentDistribution;
normal_distribution<float> m_SleepDistribution;
int m_ShufflePercent;
int m_DelayPercent;
void loop()
{
while (true)
{
try {
CallContext *callp = m_Ingress.pop();
if (m_UniformPercentDistribution(m_RandomNumberGenerator) < m_ShufflePercent) {
m_Ingress.shuffle();
}
void *contextp = callp->context();
delete callp;
if (m_UniformPercentDistribution(m_RandomNumberGenerator) < m_DelayPercent)
{
int sleep_ms = (int)m_SleepDistribution(m_RandomNumberGenerator);
if (sleep_ms > 0)
{
this_thread::sleep_for(chrono::milliseconds(sleep_ms));
}
}
MQTTDevice::CallStatus status;
status = static_cast<MQTTDevice::CallStatus>(m_StatusDistribution(m_RandomNumberGenerator));
m_CallbackFunc(contextp, status);
}
catch (ShutdownException &e) {
break;
}
}
}
public:
Client(
CallbackFunc cb,
int shuffle_percent,
int delay_percent,
int fail_percent,
int sleep_mean_ms,
int sleep_stddev_ms) :
m_CallbackFunc(cb),
m_Thread(NULL),
m_RandomNumberGenerator(random_device()()),
m_StatusDistribution({ 100.0 - fail_percent, fail_percent / 2.0, fail_percent / 2.0 }),
m_UniformPercentDistribution(0, 99),
m_SleepDistribution(sleep_mean_ms, sleep_stddev_ms),
m_ShufflePercent(shuffle_percent),
m_DelayPercent(delay_percent) {}
void start()
{
m_Thread = new thread(&Client::loop, this);
}
void join()
{
m_Ingress.shutdown();
m_Thread->join();
delete m_Thread;
}
void Send(void *contextp)
{
CallContext *callContextp = new CallContext(contextp);
m_Ingress.push(callContextp);
}
};
class Robot : public MQTTDevice
{
private:
enum Mode
{
OPERATIONAL,
DEGRADED
};
boost::circular_buffer<Item *> m_SendBuffer;
int m_NextSeqNum;
int m_InFlight;
const int m_MaxInFlight;
mutex m_Mutex;
condition_variable m_ConditionVariable;
Mode m_Mode;
Client m_Client;
public:
Robot(
int max_queued,
int max_in_flight,
int fail_percent,
int shuffle_percent,
int delay_percent,
int sleep_mean_ms,
int sleep_stddev_ms) :
m_NextSeqNum(0),
m_InFlight(0),
m_MaxInFlight(max_in_flight),
m_Mode(OPERATIONAL),
m_SendBuffer(max_queued),
m_Client(
bind(&Robot::callback, this, placeholders::_1, placeholders::_2),
shuffle_percent,
delay_percent,
fail_percent,
sleep_mean_ms,
sleep_stddev_ms) {}
void run(int iterations)
{
m_Client.start();
thread sendTh(&Robot::sendAll, this, iterations);
sendTh.join();
m_Client.join();
}
void debug(const char* prefix, int seqNo, const char* suffix=NULL)
{
if (suffix)
{
printf("%9s SeqNo: %8d (Q: %3d IF: %1d) %s\n", prefix, seqNo, (short)m_SendBuffer.size(), m_InFlight, suffix);
}
else
{
printf("%9s SeqNo: %8d (Q: %3d IF: %1d)\n", prefix, seqNo, (short)m_SendBuffer.size(), m_InFlight);
}
}
bool sendOne(Item *itemp)
{
if (m_Mode == OPERATIONAL && (m_InFlight < m_MaxInFlight))
{
m_InFlight += 1;
itemp->setStatus(Item::Status::SENT);
m_Client.Send(itemp);
debug("Sending", itemp->seqNo());
return true;
}
return false;
}
void markStatus(Item *itemp, CallStatus status)
{
for (int i=0; i<min(m_MaxInFlight, (int)m_SendBuffer.size()); i++)
{
if (m_SendBuffer[i]->seqNo() == itemp->seqNo())
{
if (status == CallStatus::SUCCESS)
{
itemp->setStatus(Item::Status::ACKED);
}
else
{
itemp->setStatus(Item::Status::FAILED);
if (m_Mode != DEGRADED)
{
cout << endl << "State: DEGRADED" << endl << endl;
m_Mode = DEGRADED;
}
}
}
}
}
bool checkAcks()
{
bool clearedSpace = false;
while ((m_InFlight > 0) &&
(m_SendBuffer[0]->getStatus() == Item::Status::ACKED))
{
m_InFlight--;
debug("", m_SendBuffer[0]->seqNo(), "<------ FINISHED ");
Item* p = m_SendBuffer[0];
m_SendBuffer.pop_front();
delete p;
clearedSpace = true;
if (m_InFlight == 0)
{
if (m_Mode == DEGRADED)
{
cout << endl << "State: OPERATIONAL" << endl << endl;
m_Mode = OPERATIONAL;
}
}
}
return clearedSpace;
}
void trySendPending()
{
for (int i=0; i<min(m_MaxInFlight, (int)m_SendBuffer.size()); i++)
{
Item::Status status = m_SendBuffer[i]->getStatus();
if (status == Item::Status::QUEUED)
{
if (false == sendOne(m_SendBuffer[i]))
{
break;
}
}
else if (status == Item::Status::ACKED)
{
assert((i != 0) && ("head of queue should not be in acked state" != 0));
}
else if (status == Item::Status::SENT)
{
}
else if (status == Item::Status::FAILED || status == Item::Status::TIMEDOUT)
{
m_SendBuffer[i]->setStatus(Item::Status::SENT);
m_Client.Send(m_SendBuffer[i]);
debug("Resending", m_SendBuffer[i]->seqNo());
break;
}
else
{
assert("unknown status" == 0);
}
}
}
void callback(void *contextp, CallStatus status)
{
Item *itemp = static_cast<Item*>(contextp);
bool spaceAvailable = false;
{
lock_guard<mutex> lk(m_Mutex);
char buf[32];
sprintf(buf, "%s", MQTTDevice::statusName(status));
debug("Received", itemp->seqNo(), buf);
markStatus(itemp, status);
spaceAvailable = checkAcks();
trySendPending();
}
if (spaceAvailable) {
m_ConditionVariable.notify_one();
}
}
void sendAll(int iterations)
{
unique_lock<mutex> lk(m_Mutex);
while (m_NextSeqNum < iterations)
{
m_ConditionVariable.wait(lk, [this]{ return (m_SendBuffer.size() < m_SendBuffer.capacity()); });
while ((m_SendBuffer.size() < m_SendBuffer.capacity()) &&
(m_NextSeqNum < iterations))
{
Item *itemp = new Item(m_NextSeqNum);
m_NextSeqNum += 1;
m_SendBuffer.push_back(itemp);
debug("Queued", itemp->seqNo());
trySendPending();
}
}
m_ConditionVariable.wait(lk, [this]{ return (m_SendBuffer.size() == 0); });
}
};
int main(int argc, char **argv)
{
try
{
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()
("help,h", "Print help messages")
("iterations,i", po::value<int>()->default_value(1024), "Number of items to send")
("in-flight,f", po::value<int>()->default_value(8), "Max number of items in flight")
("queued,q", po::value<int>()->default_value(128), "Max number of items queued")
("fail-percent", po::value<int>()->default_value(20), "Percent of calls that simulate failure")
("shuffle-percent", po::value<int>()->default_value(5), "Percent of calls that shuffle ingress queue")
("delay-percent", po::value<int>()->default_value(5), "Percent of calls that delay ingress queue")
("sleep-mean", po::value<int>()->default_value(20), "Sleep normal distribution mean")
("sleep-stddev", po::value<int>()->default_value(5), "Sleep normal distribution standard deviation");
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
if (vm.count("help"))
{
cout << "MQTT Simulator" << endl << endl
<< desc << endl;
return 0;
}
po::notify(vm);
}
catch(po::error& e)
{
cerr << "Error parsing options: " << e.what() << endl << endl;
cerr << desc << endl;
return 1;
}
int iterations = vm["iterations"].as<int>();
int max_queued = vm["queued"].as<int>();
int max_in_flight = vm["in-flight"].as<int>();
int fail_percent = vm["fail-percent"].as<int>();
int shuffle_percent = vm["shuffle-percent"].as<int>();
int delay_percent = vm["delay-percent"].as<int>();
int sleep_mean_ms = vm["sleep-mean"].as<int>();
int sleep_stddev_ms = vm["sleep-stddev"].as<int>();
Robot robot(max_queued, max_in_flight, fail_percent, shuffle_percent, delay_percent, sleep_mean_ms, sleep_stddev_ms);
robot.run(iterations);
}
catch(exception& e)
{
cerr << "Unhandled exception in main: " << e.what() << endl;
return 2;
}
return 0;
}
@jhosteny
Copy link
Author

jhosteny commented May 6, 2019

clang++ -std=c++11 -o test test.cpp -lboost_system -lpthread -lboost_program_options

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment