Skip to content

Instantly share code, notes, and snippets.

@mpusz
Created October 27, 2011 20:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mpusz/1320743 to your computer and use it in GitHub Desktop.
Save mpusz/1320743 to your computer and use it in GitHub Desktop.
Simple TTCN TE implementation
#include "utils.h"
#include <map>
#include <set>
#include <vector>
#include <tuple>
#include <iostream>
#include <algorithm>
#include <exception>
#include <atomic>
#include <future>
typedef std::tuple<std::string, int> CMessage;
class CLogging : CNonCopyable {
CConcurrent<std::ostream &> _logger;
void Write(const std::string &msg) const
{
_logger([=](std::ostream & out){ out << msg << std::endl; });
}
public:
CLogging() : _logger{std::cout} { }
void LogCreated(const std::string &comp) const
{ Write("Component '" + comp + "' created"); }
void LogConnected(const std::string &comp1, const std::string &comp2) const
{ Write("Components '" + comp1 + "' and '" + comp2 + "' connected"); }
void LogStarting(const std::string &comp) const
{ Write("Starting component '" + comp + "'"); }
void LogSending(const std::string &comp, const CMessage &m) const
{ Write("Component '" + std::get < 0 > (m) + "' is sending '" + std::to_string(std::get < 1 > (m)) + "' to '" + comp + "'"); }
void LogReceived(const std::string &comp, const CMessage &m) const
{ Write("'" + std::to_string(std::get < 1 > (m)) + "' received by Component '" + comp + "'"); }
template<typename Rep, typename Period>
void LogTimeout(const std::string &comp, const std::chrono::duration<Rep, Period> &duration) const
{ Write("'" + std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count()) + "ms' expired while waiting for data in '" + comp + "'"); }
void LogStopped(const std::string &comp) const
{ Write("Component '" + comp + "' stopped"); }
void LogKilled(const std::string &comp) const
{ Write("Component '" + comp + "' killed"); }
void LogVerdict(const std::string &comp, bool verdict) const
{ Write("Component '" + comp + "' verdict '" + std::string(verdict ? "PASS" : "FAIL") + "'"); }
void LogOther(const std::string &str) const
{ Write(str); }
};
class CComponent;
class CBehaviour;
class CComponentHandler : CNonCopyable {
typedef std::map<std::string, std::shared_ptr<CComponent>> CComponentMap;
const CLogging &_log;
CMonitor<CComponentMap> _componentMap;
std::shared_ptr<CComponent> Component(const std::string &name) const
{
try {
return _componentMap([&](CComponentMap &map) { return map.at(name); });
}
catch(const std::out_of_range &) {
throw std::invalid_argument("Component with name '" + name + "' not found");
}
}
public:
CComponentHandler(const CLogging &log): _log(log) {}
void Create(const std::string &name);
void Connect(const std::string &name1, const std::string &name2);
void Start(const std::string &name, const CBehaviour &behaviour);
void Send(const std::string &name, const CMessage &m) const;
void Done(const std::string &name) const;
bool Verdict(const std::string &name) const;
};
class CBehaviour {
CComponentHandler &_ch;
protected:
CComponentHandler &CH() const { return _ch; }
public:
CBehaviour(CComponentHandler &ch): _ch(ch) {}
virtual void Run(CComponent &comp) const = 0;
};
class EThreadStopped : public std::exception {};
class EThreadKilled : public std::exception {};
class CEventObject;
typedef std::tuple<unsigned, CEventObject *> CEvent;
typedef std::vector<CEvent> CEventList;
class CEventQueue {
static const std::chrono::milliseconds RESOLUTION;
typedef std::queue<CEvent> CQueue;
const std::atomic_bool &_kill;
CQueue _queue;
std::mutex _mutex;
std::condition_variable _newItemReady;
public:
CEventQueue(const std::atomic_bool &killed): _kill(killed) {}
std::mutex &Mutex() { return _mutex; }
void PushLocked(CEvent event)
{
bool wasEmpty = _queue.empty();
_queue.push(event);
if(wasEmpty)
_newItemReady.notify_one();
}
CEvent PopWait()
{
std::unique_lock<std::mutex> lock(_mutex);
if(_queue.empty()) {
while(true) {
if(_newItemReady.wait_for(lock, RESOLUTION,
[&_queue, &_kill]{ return _queue.size() || _kill;}))
break;
}
}
if(_queue.size()) {
CEvent event = _queue.front();
_queue.pop();
return event;
}
throw EThreadKilled();
}
};
const std::chrono::milliseconds CEventQueue::RESOLUTION(100);
class CPort;
class CEventObject {
CEventQueue &_eventQueue;
CEventList &_stuckEvents;
public:
CEventObject(CEventQueue &eventQueue, CEventList &stuckEvents):
_eventQueue(eventQueue), _stuckEvents(stuckEvents) {}
virtual ~CEventObject()
{
// make sure that objects is not on stuck events list anymore
std::remove_if(begin(_stuckEvents), end(_stuckEvents),
[this](CEvent &event){ return std::get<1>(event) == this; });
}
CEventQueue &EventQueue() const { return _eventQueue; }
CEventList &StuckEvents() const { return _stuckEvents; }
virtual void StuckEventListUpdate(unsigned id) = 0;
virtual bool Receive(const CPort *port = nullptr,
const CMessage *templ = nullptr,
const std::string &from = "",
std::function<void(const CMessage &)> &&value = nullptr,
std::string *sender = nullptr)
{
return false;
}
};
namespace std {
namespace chrono {
typedef monotonic_clock steady_clock;
}
}
class CTimer : public CEventObject {
typedef std::chrono::steady_clock CClock;
const CClock::duration _duration;
CClock::time_point _start;
public:
template<typename Rep, typename Period>
explicit CTimer(std::chrono::duration<Rep, Period> duration, CEventQueue &eventQueue, CEventList &stuckEvents):
CEventObject{eventQueue, stuckEvents}, _duration{std::chrono::duration_cast<CClock::duration>(duration)} {}
void Start() { _start = CClock::now(); }
bool Timeout() const { return CClock::now() - _start > _duration; }
virtual void StuckEventListUpdate(unsigned id) {}
};
class CPort : public CEventObject {
public:
typedef std::vector<std::string> CPeers;
private:
typedef std::queue<std::tuple<unsigned, CMessage>> CMessageQueue;
const std::string _name;
const CComponentHandler &_ch;
CPeers _peers;
CMessageQueue _inputQueue;
std::mutex _inputQueueMutex;
unsigned _nextId;
public:
CPort(const std::string &name, const CComponentHandler &ch, CEventQueue &eventQueue, CEventList &stuckEvents):
CEventObject{eventQueue, stuckEvents}, _name{name}, _ch(ch), _nextId{0} {}
const std::string &Name() const { return _name; }
const CPeers &Peers() const { return _peers; }
void Connect(const std::string &name) { _peers.push_back(name); }
void Enqueue(const CMessage &m)
{
std::unique_lock<std::mutex> inputQueueLock(_inputQueueMutex, std::defer_lock);
std::unique_lock<std::mutex> eventQueueLock(EventQueue().Mutex(), std::defer_lock);
std::lock(inputQueueLock, eventQueueLock);
unsigned id = _nextId++;
_inputQueue.push(std::make_tuple(id, m));
EventQueue().PushLocked(CEvent(id, this));
}
void Send(const std::string &name, const CMessage &m) { _ch.Send(name, m); }
bool Receive(const CPort *port = nullptr,
const CMessage *templ = nullptr,
const std::string &from = "",
std::function<void(const CMessage &)> &&value = nullptr,
std::string *sender = nullptr)
{
if(port && port != this)
return false;
std::lock_guard<std::mutex> lock(_inputQueueMutex);
if(_inputQueue.size()) {
const CMessage &msg = std::get<1>(_inputQueue.front());
if((!templ || msg == *templ) && (from == "" || std::get<0>(msg) == from)) {
if(value)
value(msg);
if(sender)
*sender = std::get<0>(msg);
_inputQueue.pop();
return true;
}
}
return false;
}
virtual void StuckEventListUpdate(unsigned id)
{
CEvent event{id, this};
CEventList &stuckEvents = StuckEvents();
auto it = std::find_if(begin(stuckEvents), end(stuckEvents),
[&event](CEvent &e) { return e == event;});
bool stucked = false;
{
std::lock_guard<std::mutex> lock{_inputQueueMutex};
stucked = (_inputQueue.size() && std::get<0>(_inputQueue.front()) == id);
}
if(stucked) {
if(it == stuckEvents.end())
stuckEvents.push_back(event);
}
else {
if(it != stuckEvents.end())
stuckEvents.erase(it);
}
}
};
class CComponent {
public:
enum class TStatus {
RUNNING,
STOPPED,
KILLED
};
private:
const std::string _name;
const CLogging &_log;
std::atomic_bool _kill;
CEventQueue _eventQueue;
CEventList _stuckEvents;
CPort _port;
mutable std::mutex _mutex;
TStatus _status;
bool _verdict;
std::future<void> _result;
void RunThread(const CBehaviour &behaviour)
{
_log.LogStarting(_name);
Status(TStatus::RUNNING);
try {
behaviour.Run(*this);
Status(TStatus::STOPPED);
_log.LogStopped(_name);
}
catch(EThreadStopped &) {
Status(TStatus::STOPPED);
_log.LogStopped(_name);
}
catch(EThreadKilled &) {
Status(TStatus::KILLED);
_log.LogKilled(_name);
}
}
void Status(TStatus status)
{
std::lock_guard<std::mutex> lock(_mutex);
_status = status;
}
public:
CComponent(const std::string &name, const CComponentHandler &ch, const CLogging &log) :
_name{name}, _log(log), _kill{false},
_eventQueue{_kill}, _port{name + ".0", ch, _eventQueue, _stuckEvents},
_status{TStatus::STOPPED}, _verdict{false}
{
}
~CComponent()
{
if(Status() == TStatus::RUNNING) {
_kill.store(true, std::memory_order_relaxed);
_result.wait();
}
}
const std::string &Name() const { return _name; }
CEventQueue &EventQueue() { return _eventQueue; }
CEventList &StuckEvents() { return _stuckEvents; }
TStatus Status() const
{
std::lock_guard<std::mutex> lock(_mutex);
return _status;
}
CPort &Port() { return _port; }
void Start(const CBehaviour &behaviour)
{
_result = std::async(std::launch::async, [&]{ CComponent::RunThread(behaviour); });
}
void Verdict(bool verdict)
{
_verdict = verdict;
_log.LogVerdict(_name, verdict);
}
bool Verdict() { return _verdict; }
void Done() { _result.wait(); }
};
void CComponentHandler::Create(const std::string &name)
{
auto comp = std::make_shared<CComponent>(name, *this, _log);
_componentMap([&](CComponentMap &map){ map[name] = std::move(comp); });
_log.LogCreated(name);
}
void CComponentHandler::Connect(const std::string &name1, const std::string &name2)
{
auto comp1 = Component(name1);
auto comp2 = Component(name2);
comp1->Port().Connect(name2);
comp2->Port().Connect(name1);
_log.LogConnected(name1, name2);
}
void CComponentHandler::Start(const std::string &name, const CBehaviour &behaviour)
{
Component(name)->Start(behaviour);
}
void CComponentHandler::Send(const std::string &name, const CMessage &m) const
{
_log.LogSending(name, m);
Component(name)->Port().Enqueue(m);
}
void CComponentHandler::Done(const std::string &name) const
{
Component(name)->Done();
}
bool CComponentHandler::Verdict(const std::string &name) const
{
return Component(name)->Verdict();
}
class CAlt {
typedef std::tuple<std::function<bool(CEventObject &)>, std::function<void()> > CBranchData;
typedef std::deque<CBranchData> CBranches;
CEventQueue &_eventQueue;
CEventList &_stuckEvents;
CBranches _branches;
bool _repeat;
public:
CAlt(CEventQueue &eventQueue, CEventList &stuckEvents):
_eventQueue(eventQueue), _stuckEvents(stuckEvents), _repeat{false}
{
}
void Add(std::function<bool(CEventObject &)> &&predicate, std::function<void()> &&action)
{
_branches.push_back(std::make_tuple(predicate, action));
}
void Repeat()
{
_repeat = true;
}
// void Run()
void Run(CComponent &comp)
{
bool found;
do {
typedef std::set<CEventObject *> EventObjectSet;
EventObjectSet checkedObjects;
CEvent event;
CBranches::iterator it;
bool newEvent = false;
found = false;
// process old events
for(auto e : _stuckEvents) {
event = e;
checkedObjects.insert(std::get<1>(event));
it = std::find_if(begin(_branches), end(_branches),
[&event](const CBranchData &d)
{ return std::get<0>(d)(*std::get<1>(event)); });
if(it != _branches.end()) {
found = true;
break;
}
}
if(!found) {
// process new event
event = _eventQueue.PopWait();
newEvent = true;
// check if not a stucked object
if(checkedObjects.find(std::get<1>(event)) == checkedObjects.end()) {
it = std::find_if(begin(_branches), end(_branches),
[&event](const CBranchData &d)
{ return std::get<0>(d)(*std::get<1>(event)); });
if(it != _branches.end())
found = true;
}
}
if(found) {
// run the action
_repeat = false;
std::get<1>(*it)();
}
if(newEvent) {
// if that alternate does not wait on that object or an event was not removed from
// object internal events queue than add that event to the stuck events list
std::get<1>(event)->StuckEventListUpdate(std::get<0>(event));
}
}
while(_repeat || !found);
}
};
class CBehaviourServer : public CBehaviour {
public:
CBehaviourServer(CComponentHandler &ch): CBehaviour{ch} { }
void Run(CComponent &comp) const
{
CAlt alt{comp.EventQueue(), comp.StuckEvents()};
CMessage msg;
alt.Add([&](CEventObject &obj)
{ return obj.Receive(nullptr, nullptr, "", [&](const CMessage &m) { msg = m; }, nullptr); },
[&]{
CH().Send(std::get<0>(msg), CMessage(comp.Name(), std::get<1>(msg) * std::get<1>(msg)));
alt.Repeat();
});
alt.Run(comp);
}
};
class CBehaviourClient : public CBehaviour {
static const int RETRY_NUM = 10;
public:
CBehaviourClient(CComponentHandler &ch): CBehaviour(ch) {}
void Run(CComponent &comp) const
{
const std::chrono::milliseconds MSG_TIMEOUT(500);
const int RETRY_NUM = 5;
CPort &port = comp.Port();
CTimer t(MSG_TIMEOUT, comp.EventQueue(), comp.StuckEvents());
int count = 0;
port.Send(port.Peers().front(), CMessage(comp.Name(), count));
t.Start();
CAlt alt(comp.EventQueue(), comp.StuckEvents());
CMessage m(port.Peers().front(), count * count);
alt.Add([&](CEventObject &obj)
{ return obj.Receive(&port, &m); },
[&]{
comp.Verdict(true);
});
alt.Add([&](CEventObject &obj)
{ return obj.Receive(&port); },
[&]{
comp.Verdict(false);
});
alt.Add([&](CEventObject &obj)
{ return count < RETRY_NUM && t.Timeout(); },
[&]{
count = count + 1;
port.Send(port.Peers().front(), CMessage(comp.Name(), count));
alt.Repeat();
});
alt.Add([&](CEventObject &obj)
{ return count == RETRY_NUM && t.Timeout(); },
[&]{
comp.Verdict(false);
});
alt.Run(comp);
}
};
class CBehaviourTest : public CBehaviour {
constexpr static unsigned CLIENTS_COUNT = 5;
public:
CBehaviourTest(CComponentHandler &ch): CBehaviour(ch) {}
void Run(CComponent &comp) const
{
// create clients
for(unsigned i=0; i<CLIENTS_COUNT; i++)
CH().Create("client_" + std::to_string(i));
// connect clients
for(unsigned i=0; i<CLIENTS_COUNT; i++)
CH().Connect("client_" + std::to_string(i), comp.Port().Peers().front());
// start clients behavior
CBehaviourClient clientBehaviour(CH());
for(unsigned i=0; i<CLIENTS_COUNT; i++)
CH().Start("client_" + std::to_string(i), clientBehaviour);
// wait for clients finish
for(unsigned i=0; i<CLIENTS_COUNT; i++)
CH().Done("client_" + std::to_string(i));
// obtain clients verdict
bool verdict = true;
for(unsigned i=0; i<CLIENTS_COUNT; i++)
verdict &= CH().Verdict("client_" + std::to_string(i));
comp.Verdict(verdict);
}
};
int main()
{
CLogging log;
log.LogOther("Creating components");
CComponentHandler ch(log);
ch.Create("server");
ch.Create("test");
ch.Connect("server", "test");
log.LogOther("Running components");
CBehaviourServer serverBehavior(ch);
ch.Start("server", serverBehavior);
CBehaviourTest testBehavior(ch);
ch.Start("test", testBehavior);
ch.Done("test");
std::string verdict(ch.Verdict("test") ? "PASS" : "FAIL");
log.LogOther("Verdict: " + verdict);
}
#ifndef UTILS_H
#define UTILS_H
#include <thread>
#include <future>
#include <queue>
class CNonCopyable {
public:
CNonCopyable() = default;
CNonCopyable(const CNonCopyable &) = delete;
CNonCopyable &operator=(const CNonCopyable &) = delete;
};
template<class T>
class CMonitor {
mutable T _t;
mutable std::mutex _m;
public:
CMonitor(T t=T{}): _t(t) {}
template<typename F>
auto operator()(F f) const -> decltype(f(_t))
{
std::lock_guard<std::mutex> lck{_m};
return f(_t);
}
};
template<typename T>
class CWaitQueue : CNonCopyable {
std::queue<T> _queue;
std::condition_variable _newItemReady;
std::mutex _mutex;
public:
CWaitQueue() = default;
void Push(T &&msg)
{
std::lock_guard<std::mutex> lock{_mutex};
bool wasEmpty = _queue.empty();
_queue.emplace(std::move(msg));
if(wasEmpty)
_newItemReady.notify_one();
}
T PopWait()
{
std::unique_lock<std::mutex> lock{_mutex};
_newItemReady.wait(lock, [&]{ return !_queue.empty(); });
auto msg = std::move(_queue.front());
_queue.pop();
return msg;
}
};
namespace details {
template<typename FUT, typename F, typename T>
void SetValue(std::promise<FUT> &p, F &f, T &t)
{
p.set_value(f(t));
}
template<typename F, typename T>
void SetValue(std::promise<void> &p, F &f, T &t)
{
f(t);
p.set_value();
}
}
template<class T>
class CConcurrent {
mutable T _t;
mutable CWaitQueue<std::function<void()>> _queue;
bool _done;
std::thread _thread;
public:
CConcurrent(T t = T{}):
_t(t),
_done{false},
_thread{[=]{ while(!_done) _queue.PopWait()(); }}
{
}
~CConcurrent()
{
_queue.Push([=]{ _done = true; });
_thread.join();
}
template<typename F>
auto operator()(F f) const -> std::future<decltype(f(_t))>
{
auto p = std::make_shared<std::promise<decltype(f(_t))>>();
auto ret = p->get_future();
_queue.Push([=]{
try {
details::SetValue(*p, f, _t);
}
catch(...) {
p->set_exception(std::current_exception());
}
});
return ret;
}
};
#endif // UTILS_H
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment