Skip to content

Instantly share code, notes, and snippets.

@thierryseegers
Created April 23, 2014 02:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thierryseegers/11201003 to your computer and use it in GitHub Desktop.
Save thierryseegers/11201003 to your computer and use it in GitHub Desktop.
Demo version of event_channel implemented in C++03/Boost, C++11 and C++14.
Dummy file for the sake of having a good name for this gist.
cmake_minimum_required(VERSION 2.6)
project(event_channel_versions)
set(Boost_USE_STATIC_LIBS TRUE)
find_package(Boost COMPONENTS atomic chrono date_time system thread)
if(Boost_FOUND)
add_definitions(-DBoost_VERSION=${Boost_VERSION})
include_directories(${Boost_INCLUDE_DIRS})
endif()
if(CMAKE_GENERATOR STREQUAL Xcode)
set(CMAKE_XCODE_ATTRIBUTE_CLANG_CXX_LANGUAGE_STANDARD "c++1y")
elseif("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
list(APPEND CMAKE_CXX_FLAGS "-std=c++1y")
elseif("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang")
list(APPEND CMAKE_CXX_FLAGS "-std=c++1y -stdlib=libc++")
endif()
add_executable(versions event_channel_boost.h event_channel_cpp11.h event_channel_cpp14.h main.cpp)
if(Boost_FOUND)
target_link_libraries(versions ${Boost_LIBRARIES})
endif()
#if !defined(EVENT_CHANNEL_BOOST_H)
#define EVENT_CHANNEL_BOOST_H
#include <boost/atomic.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/functional/hash.hpp>
#include <boost/lambda/lambda.hpp>
#include <boost/thread.hpp>
#include <boost/type_traits.hpp>
#include <map>
#include <vector>
namespace event_channel { namespace cpp03 {
class channel
{
typedef boost::hash<std::string>::result_type event_tag_t; //!< Type of the tag of an event's type.
typedef boost::function<const void* ()> wrapped_event_t; //!< Type of functor that hides an event's type.
typedef std::pair<event_tag_t, wrapped_event_t> tagged_wrapped_event_t; //!< Type of the association of an event's type tag, and it's associated wrapper functor.
typedef std::vector<tagged_wrapped_event_t> tagged_wrapped_events_t; //!< Type of a set of events.
tagged_wrapped_events_t events_;
typedef boost::function<void (const void*)> wrapped_subscriber_t; //!< Type of functor that wraps a subscriber.
typedef std::vector<wrapped_subscriber_t> wrapped_subscribers_t; //!< Type of collection of wrapped subscribers.
typedef std::map<event_tag_t, wrapped_subscribers_t> subscribers_by_tag_t; //!< Type of subscribers key'ed by event tags.
subscribers_by_tag_t dispatcher_;
boost::mutex dispatcher_m_, events_m_;
boost::condition_variable events_cv_;
boost::atomic<bool> processing_;
boost::thread run_t_;
boost::hash<std::string> hasher_;
void run()
{
while(processing_)
{
tagged_wrapped_events_t events;
{
boost::unique_lock<boost::mutex> ul(events_m_);
events_cv_.wait(ul, !(boost::bind(&tagged_wrapped_events_t::empty, boost::cref(events_)) && boost::ref(processing_)));
events_.swap(events);
}
for(tagged_wrapped_events_t::const_iterator e = events.begin(); e != events.end(); ++e)
{
boost::lock_guard<boost::mutex> lg(dispatcher_m_);
const wrapped_subscribers_t& targets = dispatcher_[e->first];
for(wrapped_subscribers_t::const_iterator t = targets.begin(); t != targets.end(); ++t)
{
(*t)(e->second());
}
}
}
}
template<typename T>
static const T& cast_and_dereference(const void* p)
{
return *static_cast<const T*>(p);
}
public:
channel() : processing_(true)
{
run_t_ = boost::thread(&channel::run, this);
}
virtual ~channel()
{
processing_ = false;
{
boost::lock_guard<boost::mutex> lg(events_m_);
events_cv_.notify_one();
}
run_t_.join();
}
template<typename R, typename E>
void subscribe(R (*f)(E))
{
boost::lock_guard<boost::mutex> lg(dispatcher_m_);
typedef typename boost::remove_reference<typename boost::remove_cv<E>::type>::type raw_type;
dispatcher_[hasher_(typeid(E).name())].push_back(boost::bind(f, boost::bind(&cast_and_dereference<raw_type>, boost::lambda::_1)));
};
template<typename E>
void send(const E& e)
{
boost::lock_guard<boost::mutex> lg(events_m_);
events_.push_back(std::make_pair(hasher_(typeid(E).name()), boost::bind(&boost::addressof<E>, e)));
events_cv_.notify_one();
}
};
}}
#endif
#if !defined(EVENT_CHANNEL_CPP11_H)
#define EVENT_CHANNEL_CPP11_H
#include <atomic>
#include <condition_variable>
#include <functional>
#include <map>
#include <mutex>
#include <thread>
#include <type_traits>
#include <typeindex>
#include <typeinfo>
#include <utility>
#include <vector>
namespace event_channel { namespace cpp11 {
class channel
{
typedef std::type_index event_tag_t; //!< Type of the tag of an event's type.
typedef std::function<const void* ()> wrapped_event_t; //!< Type of functor that hides an event's type.
typedef std::pair<event_tag_t, wrapped_event_t> tagged_wrapped_event_t; //!< Type of the association of an event's type tag, and it's associated wrapper functor.
typedef std::vector<tagged_wrapped_event_t> tagged_wrapped_events_t; //!< Type of a set of events.
tagged_wrapped_events_t events_;
typedef std::function<void (const void*)> wrapped_subscriber_t; //!< Type of functor that wraps a subscriber.
typedef std::vector<wrapped_subscriber_t> wrapped_subscribers_t; //!< Type of collection of wrapped subscribers.
typedef std::map<event_tag_t, wrapped_subscribers_t> subscribers_by_tag_t; //!< Type of subscribers key'ed by event tags.
subscribers_by_tag_t dispatcher_;
std::mutex dispatcher_m_, events_m_;
std::condition_variable events_cv_;
std::atomic<bool> processing_;
std::thread run_t_;
template<typename T>
using remove_cv_ref = std::remove_reference<typename std::remove_cv<T>::type>;
void run()
{
while(processing_)
{
tagged_wrapped_events_t events;
{
std::unique_lock<std::mutex> ul(events_m_);
events_cv_.wait(ul, [this]{ return !processing_ || !events_.empty(); });
events_.swap(events);
}
for(auto const& tagged_wrapped_event : events)
{
std::lock_guard<std::mutex> lg(dispatcher_m_);
auto const& subscribers = dispatcher_[tagged_wrapped_event.first];
for(auto const& wrapped_subscriber : subscribers)
{
wrapped_subscriber(tagged_wrapped_event.second());
}
}
}
}
public:
channel() : processing_(true)
{
run_t_ = std::thread(&channel::run, this);
}
virtual ~channel()
{
processing_ = false;
{
std::lock_guard<std::mutex> lg(events_m_);
events_cv_.notify_one();
}
run_t_.join();
}
template<typename R, typename E>
void subscribe(R (*f)(E))
{
std::lock_guard<std::mutex> lg(dispatcher_m_);
dispatcher_[std::type_index(typeid(E))].push_back(
[f](const void* e)
{
(*f)(*static_cast<const typename remove_cv_ref<E>::type*>(e));
});
};
template<typename E>
void send(const E& e)
{
std::lock_guard<std::mutex> lg(events_m_);
events_.push_back(std::make_pair(std::type_index(typeid(E)), [e]{ return &e; }));
events_cv_.notify_one();
}
};
}}
#endif
#if !defined(EVENT_CHANNEL_CPP14_H)
#define EVENT_CHANNEL_CPP14_H
#include <algorithm>
#include <condition_variable>
#include <functional>
#include <map>
#include <mutex>
#include <thread>
#include <tuple>
#include <type_traits>
#include <typeindex>
#include <typeinfo>
#include <utility>
#include <vector>
namespace event_channel { namespace cpp14 {
// Hash value combiner for compile-time types. Adapted from boost.
template <typename T, typename... U>
struct hash_combine
{
static auto hash_code()
{
auto h = hash_combine<U...>::hash_code();
return h ^= std::type_index(typeid(T)).hash_code() + 0x9e3779b9 + (h << 6) + (h >> 2);
}
};
template<typename T>
struct hash_combine<T>
{
static auto hash_code()
{
return std::type_index(typeid(T)).hash_code();
}
};
template <typename... Args>
auto hash_code()
{
return hash_combine<Args...>::hash_code();
}
template<>
auto hash_code<>()
{
return hash_combine<void>::hash_code();
}
class channel
{
typedef std::result_of<decltype(&hash_code<>)()>::type event_tag_t; //!< Type of the tag of an event's type.
typedef std::function<const void* ()> wrapped_event_t; //!< Type of functor that hides an event's type.
typedef std::pair<event_tag_t, wrapped_event_t> tagged_wrapped_event_t; //!< Type of the association of an event's type tag, and it's associated wrapper functor.
typedef std::vector<tagged_wrapped_event_t> tagged_wrapped_events_t; //!< Type of a set of events.
tagged_wrapped_events_t events_;
typedef std::function<void (const void*)> wrapped_subscriber_t; //!< Type of functor that wraps a subscriber.
typedef std::vector<wrapped_subscriber_t> wrapped_subscribers_t; //!< Type of collection of wrapped subscribers.
typedef std::map<event_tag_t, wrapped_subscribers_t> subscribers_by_tag_t; //!< Type of subscribers key'ed by event tags.
subscribers_by_tag_t dispatcher_;
std::mutex dispatcher_m_, events_m_;
std::condition_variable events_cv_;
std::atomic<bool> processing_;
std::thread run_t_;
template<typename... Args>
using tuple_type_t = typename std::result_of<decltype(&std::make_tuple<Args...>)(Args...)>::type;
template<typename F, typename Tuple, size_t... Is>
static void invoke(F&& f, Tuple&& params, std::index_sequence<Is...>)
{
f(std::get<Is>(std::forward<Tuple>(params))...);
}
void run()
{
while(processing_)
{
tagged_wrapped_events_t events;
{
std::unique_lock<std::mutex> ul(events_m_);
events_cv_.wait(ul, [this]{ return !processing_ || !events_.empty(); });
events_.swap(events);
}
for(auto const& tagged_wrapped_event : events)
{
std::lock_guard<std::mutex> lg(dispatcher_m_);
auto const& subscribers = dispatcher_[tagged_wrapped_event.first];
for(auto const& wrapped_subscriber : subscribers)
{
wrapped_subscriber(tagged_wrapped_event.second());
}
}
}
}
public:
channel() : processing_(true)
{
run_t_ = std::thread(&channel::run, this);
}
virtual ~channel()
{
processing_ = false;
{
std::lock_guard<std::mutex> lg(events_m_);
events_cv_.notify_one();
}
run_t_.join();
}
template<typename R, typename... Args>
void subscribe(R (*f)(Args...))
{
std::lock_guard<std::mutex> lg(dispatcher_m_);
dispatcher_[hash_code<Args...>()].push_back(
[f](const void* params)
{
invoke(f, *static_cast<const tuple_type_t<Args...>*>(params), std::index_sequence_for<Args...>{});
});
}
template<typename... Args>
void send(Args&&... args)
{
std::lock_guard<std::mutex> lg(events_m_);
events_.push_back(std::make_pair(hash_code<Args...>(), [e = std::make_tuple(std::forward<Args>(args)...)]{ return &e; }));
events_cv_.notify_one();
}
};
}}
#endif
#if defined(Boost_VERSION)
#include "event_channel_boost.h"
#endif
#include "event_channel_cpp11.h"
#if defined(_LIBCPP_VERSION) // AFAICT, only libc++ has std::index_sequence in an official release as of April 2014.
#include "event_channel_cpp14.h"
#endif
#include <chrono>
#include <iostream>
#include <string>
#include <thread>
using namespace std;
void foo(int i)
{
cout << __FUNCTION__ << ": " << i << endl;
}
void bar(int j)
{
cout << __FUNCTION__ << ": " << j << endl;
}
void baz(string const& s)
{
cout << __FUNCTION__ << ": " << s << endl;
}
void qux(float *f)
{
cout << __FUNCTION__ << ": " << *f << endl;
}
template<typename A, typename B, typename C>
void generic(A const& a, B const& b, C const& c)
{
cout << __FUNCTION__ << ": " << a << ' ' << b << ' ' << c << endl;
}
void nothing()
{
cout << __FUNCTION__ << endl;
}
template<typename Channel>
void test(Channel&& channel, string const& who)
{
channel.subscribe(foo);
channel.send(11);
this_thread::sleep_for(chrono::seconds(1));
channel.subscribe(bar);
channel.send(22);
this_thread::sleep_for(chrono::seconds(1));
channel.subscribe(qux);
float f = 44.4f;
channel.send(&f);
this_thread::sleep_for(chrono::seconds(1));
channel.subscribe(baz);
channel.send(string("Hello from ") + who);
this_thread::sleep_for(chrono::seconds(1));
}
int main()
{
#if defined(Boost_VERSION)
test(event_channel::cpp03::channel(), "C++03 + Boost");
cout << endl;
#endif
test(event_channel::cpp11::channel(), "C++11");
cout << endl;
#if defined(_LIBCPP_VERSION)
{
event_channel::cpp14::channel channel;
test(channel, "C++14");
channel.subscribe(generic<int, float, string>);
channel.send(66, 77.7f, string("the third argument"));
this_thread::sleep_for(chrono::seconds(1));
channel.subscribe(nothing);
channel.send();
this_thread::sleep_for(chrono::seconds(1));
}
#endif
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment