Skip to content

Instantly share code, notes, and snippets.

Last active February 23, 2021 05:07
Show Gist options
  • Save 3noch/ebd89b5a6a657167f2f2a4a2a035386f to your computer and use it in GitHub Desktop.
Save 3noch/ebd89b5a6a657167f2f2a4a2a035386f to your computer and use it in GitHub Desktop.
// Various functions for working with containers of any kind.
#pragma once
#include <boost/optional.hpp>
#include <vector>
// -------- Advanced C++ Trickery ----------------
// From
template <typename T>
struct function_traits : public function_traits<decltype(&T::operator())>
// For generic types, directly use the result of the signature of its 'operator()'
template <typename ClassType, typename ReturnType, typename... Args>
struct function_traits<ReturnType (ClassType::*)(Args...) const>
// we specialize for pointers to member function
arity = sizeof...(Args)
// arity is the number of arguments.
typedef ReturnType result_type;
template <size_t i>
struct arg
typedef typename std::tuple_element<i, std::tuple<Args...>>::type type;
// the i-th argument is equivalent to the i-th tuple element of a tuple
// composed of those arguments.
// ----------------------------------------------
// Map a function over an optional value
template <typename Fn, typename Arg = typename function_traits<Fn>::template arg<0>::type,
typename Result = typename function_traits<Fn>::result_type>
inline boost::optional<Result> mapped(boost::optional<Arg> const& x, Fn f)
return x ? f(*x) : boost::optional<Result>();
// Map a function over a vector
template <typename Fn, typename Arg = typename function_traits<Fn>::template arg<0>::type,
typename Result = typename function_traits<Fn>::result_type>
inline std::vector<Result> mapped(std::vector<Arg> const& x, Fn f)
std::vector<Result> out;
std::transform(x.begin(), x.end(), std::back_inserter(out), f);
return out;
// Filter a vector with a predicate. Elements for which the predicate is true are added to the result.
template <typename Fn, typename T = typename function_traits<Fn>::template arg<0>::type>
inline std::vector<T> filtered(std::vector<T> const& v, Fn f)
std::vector<T> out;
std::copy_if(v.begin(), v.end(), std::back_inserter(out), f);
return out;
// Zips two vectors together using the supplied function. The resulting vector is the same
// length as the shorter of the two input vectors.
template <typename Fn, typename Arg1 = typename function_traits<Fn>::template arg<0>::type,
typename Arg2 = typename function_traits<Fn>::template arg<1>::type,
typename Result = typename function_traits<Fn>::result_type>
std::vector<Result> zipWith(std::vector<Arg1> const& as, std::vector<Arg2> const& bs, Fn f)
size_t const shorterSize = std::min(as.size(), bs.size());
std::vector<Result> out;
for (size_t i = 0; i < shorterSize; i++)
out.push_back(f(as[i], bs[i]));
return out;
#include "FRP.hpp"
void main()
rx::Event<std::string> msgs;
auto const x =
rx::forEach(msgs, [](std::string const& msg) { std::cout << "GOT EVENT: " << msg << std::endl; });
rx::Event<std::string> mappedEvt = mappedOptional(
msgs, [](std::string const& m) -> boost::optional<std::string> { return std::string("same"); });
rx::Dynamic<size_t> msgsCounted = count(msgs);
auto const countsub =
rx::forEach(msgsCounted.updated(), [](size_t c) { std::cout << "Counted: " << c << std::endl; });
auto const countedFiltered = filtered(msgsCounted.updated(), [](size_t i) { return i % 2 == 0; });
auto const countfiltsub =
rx::forEach(countedFiltered, [](size_t c) { std::cout << "Filtered count: " << c << std::endl; });
auto const y = rx::forEach(
mappedEvt, [=](std::string const& msg) { std::cout << "GOT MAPPED EVENT: " << msg << std::endl; });
auto const xs = rx::collect(mappedEvt);
rx::Dynamic<std::string> dyn = rx::mkDynamic<std::string>(msgs, "unset");"Hello");"There");
std::cout << "DYN CURRENT: " << dyn.current() << std::endl;"Cool!!");
std::cout << "ALL: " << xs.result() << std::endl;
}"no subscribers");
#pragma once
#include <functional>
#include <boost/noncopyable.hpp>
#include <boost/optional.hpp>
#include <ros/ros.h>
#include "Containers.hpp"
#include "ThreadSafe.hpp"
namespace rx
template <typename In, typename Out>
struct IEvent
using input_type = In;
using output_type = Out;
virtual void fire(In const& t) = 0;
virtual void subscribe(std::weak_ptr<std::function<void(Out const&)> const> const& f) = 0;
template <typename T>
struct BasicEvent final : public IEvent<T, T>, private boost::noncopyable
void fire(T const& t) override
bool doCleaning = false;
for (size_t i = 0; i < subscribers.size(); i++)
if (subscribers[i].expired())
doCleaning = true;
if (doCleaning)
std::remove_if(subscribers.begin(), subscribers.end(),
[](std::weak_ptr<std::function<void(T const&)> const> const& x) { return x.expired(); });
void subscribe(std::weak_ptr<std::function<void(T const&)> const> const& f) override
std::vector<std::weak_ptr<std::function<void(T const&)> const>> subscribers;
template <typename T>
struct ThreadSafeEvent final : public IEvent<T, T>, private boost::noncopyable
void fire(T const& t) override
// TODO: Loop over copy of subscribers instead?
event.with([&](BasicEvent<T>& e) {; });
void subscribe(std::weak_ptr<std::function<void(T const&)> const> const& f) override
event.with([&](BasicEvent<T>& e) { e.subscribe(f); });
ThreadSafe<BasicEvent<T>> event;
template <typename T>
struct Event final : public IEvent<T, T>
Event() : event(std::make_shared<ThreadSafeEvent<T>>())
Event(std::shared_ptr<IEvent<T, T>> e) : event(e)
assert(e != nullptr);
void fire(T const& t) override
void subscribe(std::weak_ptr<std::function<void(T const&)> const> const& f) override
std::shared_ptr<IEvent<T, T>> event;
template <typename In, typename Out>
struct ChainedEvent final : public IEvent<Out, Out>
// TODO: Apply functor laws optimization (e.g. fuzed chain-events)
ChainedEvent(std::shared_ptr<std::function<void(In const&)> const> upstreamSubscription, Event<In> upstream,
Event<Out> downstream)
: upstreamSubscription(upstreamSubscription), upstream(upstream), downstream(downstream)
void fire(Out const& t) override
void subscribe(std::weak_ptr<std::function<void(Out const&)> const> const& f) override
std::shared_ptr<std::function<void(In const&)> const> upstreamSubscription;
Event<In> upstream;
Event<Out> downstream;
// An event that never fires and therefore has no real subscriptions.
template <typename In, typename Out = In>
struct NeverEvent : public IEvent<In, Out>
void fire(In const& t) override
void subscribe(std::weak_ptr<std::function<void(Out const&)> const> const& f) override
template <typename Fn, typename Upstream = typename function_traits<Fn>::template arg<0>::type,
typename Downstream = typename function_traits<Fn>::result_type::value_type>
inline Event<Downstream> mappedOptional(Event<Upstream> in, Fn f)
Event<Downstream> downstream;
std::shared_ptr<ChainedEvent<Upstream, Downstream>> chained = std::make_shared<ChainedEvent<Upstream, Downstream>>(
std::make_shared<std::function<void(Upstream const&)> const>([=](Upstream const& upstream) mutable {
boost::optional<Downstream> const result = f(upstream);
if (result)*result);
in, downstream);
return Event<Downstream>(chained);
template <typename Fn, typename Upstream = typename function_traits<Fn>::template arg<0>::type,
typename Downstream = typename function_traits<Fn>::result_type>
inline Event<Downstream> mapped(Event<Upstream> in, Fn f)
return mappedOptional(in, [=](Upstream const& upstream) { return boost::optional<Downstream>(f(upstream)); });
template <typename Fn, typename T = typename function_traits<Fn>::template arg<0>::type>
inline Event<T> filtered(Event<T> in, Fn f)
return mappedOptional(in, [=](T const& t) { return f(t) ? boost::optional<T>(t) : boost::optional<T>(); });
template <typename In, typename Out = In>
inline Event<Out> never()
return Event<Out>(std::make_shared<NeverEvent<In, Out>>());
template <typename T>
struct ISink
using value_type = T;
virtual T result() const = 0;
template <typename T>
struct Sink final : public ISink<T>
Sink(std::shared_ptr<ISink<T>> sink) : sink(sink)
assert(sink != nullptr);
T result() const override
return sink->result();
std::shared_ptr<ISink<T>> const sink;
template <typename T>
struct VoidSink final : public ISink<void>
VoidSink(std::shared_ptr<std::function<void(T const&)> const> subscription, Event<T> upstream)
: subscription(subscription), upstream(upstream)
assert(subscription != nullptr);
void result() const override
std::shared_ptr<std::function<void(T const&)> const> subscription;
Event<T> upstream;
template <typename T>
struct VectorSink final : public ISink<std::vector<T>>
VectorSink(Event<T> upstream)
: upstream(upstream), subscription(std::make_shared<std::function<void(T const&)> const>([&](T const& t) {
items.with([&](std::vector<T>& vec) { vec.push_back(t); });
std::vector<T> result() const override
return items.getCopy();
Event<T> upstream;
ThreadSafe<std::vector<T>> items;
std::shared_ptr<std::function<void(T const&)> const> subscription;
template <typename Fn, typename In = typename function_traits<Fn>::template arg<0>::type, typename Out = In>
inline Sink<void> forEach(Event<In> in, Fn f)
std::shared_ptr<VoidSink<In>> const sink = std::make_shared<VoidSink<In>>(
std::make_shared<std::function<void(Out const&)> const>([=](Out const& t) { f(t); }), in);
return Sink<void>(sink);
template <typename T>
inline Sink<std::vector<T>> collect(Event<T> in)
return Sink<std::vector<T>>(std::make_shared<VectorSink<T>>(in));
template <typename T>
struct IDynamic
using value_type = T;
virtual Event<T> updated() const = 0;
virtual T current() const = 0;
template <typename T>
struct Dynamic final : public IDynamic<T>
Dynamic(std::shared_ptr<IDynamic<T>> d) : dynamic(d)
assert(d != nullptr);
Event<T> updated() const override
return dynamic->updated();
T current() const override
return dynamic->current();
std::shared_ptr<IDynamic<T>> dynamic;
template <typename T>
struct BasicDynamic final : public IDynamic<T>, private boost::noncopyable
BasicDynamic(Event<T> e, T initialValue)
: currentValue(initialValue)
, updater(std::make_shared<std::function<void(T const&)> const>([&](T const& t) { currentValue.set(t); }))
, event(e)
Event<T> updated() const override
return event;
T current() const override
return currentValue.getCopy();
ThreadSafe<T> currentValue;
std::shared_ptr<std::function<void(T const&)> const> updater; // N.B. this may holds references to currentValue so
// must be after it
Event<T> event; // N.B. this may hold references to other members so much be destructed first
template <typename T>
inline Dynamic<T> mkDynamic(Event<T> event, T initialValue)
std::shared_ptr<BasicDynamic<T>> d = std::make_shared<BasicDynamic<T>>(event, initialValue);
return Dynamic<T>(d);
template <typename Fn, typename T = typename function_traits<Fn>::template arg<0>::type,
typename Result = typename function_traits<Fn>::result_type>
inline Dynamic<Result> fold(Event<T> event, Result initialValue, Fn f)
std::shared_ptr<BasicDynamic<Result>> d = std::make_shared<BasicDynamic<Result>>(never<Result>(), initialValue);
BasicDynamic<Result>& selfRef = *d;
d->event = mapped(event, [&selfRef, f](T const& e) { return f(e, selfRef.current()); });
return Dynamic<Result>(d);
template <typename T>
inline Dynamic<size_t> count(Event<T> e)
return fold(e, static_cast<size_t>(0), [](T const& _, size_t prev) -> size_t { return prev + 1; });
} // namespace rx
#pragma once
#include <mutex>
#include <boost/noncopyable.hpp>
template <typename T>
struct ThreadSafe : private boost::noncopyable
ThreadSafe(T&& t) : t_(std::move(t))
ThreadSafe(T const& t) : t_(t)
void set(T t)
std::lock_guard<std::mutex> guard(mutex_);
t_ = t;
// Makes a copy of the underlying object and returns it.
// TODO: This is probably not sufficiently correct.
T getCopy() const
std::lock_guard<std::mutex> guard(mutex_);
return T(t_);
template <typename Result>
Result with(std::function<Result(T&)> f)
std::lock_guard<std::mutex> guard(mutex_);
return f(t_);
void with(std::function<void(T&)> f)
std::lock_guard<std::mutex> guard(mutex_);
return f(t_);
template <typename Result>
Result with(std::function<Result(T const&)> f) const
std::lock_guard<std::mutex> guard(mutex_);
return f(t_);
void with(std::function<void(T const&)> f) const
std::lock_guard<std::mutex> guard(mutex_);
mutable std::mutex mutex_;
T t_;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment