Skip to content

Instantly share code, notes, and snippets.

@till-varoquaux
Created July 1, 2014 14:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save till-varoquaux/3d03ebc05262662e5a54 to your computer and use it in GitHub Desktop.
Save till-varoquaux/3d03ebc05262662e5a54 to your computer and use it in GitHub Desktop.
A small asynchronous monad...
#pragma once
/**
* This is a quick abstraction over non-blocking asynchronous deferreds. In a
* way you can see those as non-blocking futures (whereas std::future are
* blocking).
* This implementation is devoid of fanciness. Amongst other things:
* - Deferreds are not cancelable
* - They are not threadsafe. Callbacks are ran in whichever thread the
* deferred was triggered and there's no mutex to protect from race condition
* while registering callbacks/setting values. This is what we use libev
* for...
*/
#include <functional>
#include <vector>
#include <cassert>
#include <memory>
template<typename _Val, typename _Error = void>
class deferred_trigger;
template<typename _Val, typename _Error = void>
class __deferred_state;
template<class _Val, typename _Error = void>
class deferred {
private
:
// Needs to be shared between the trigger and the deferred.
// This were all the state actually resides. It's memory allocated so it never
// needs to move and its lifespan is not tied to the lifespan of the deferred.
std::shared_ptr<__deferred_state<_Val, _Error>> state_;
explicit deferred(std::shared_ptr<__deferred_state<_Val, _Error>> s) :
state_(s) {}
public
:
friend class deferred_trigger<_Val, _Error>;
deferred() = delete;
deferred(const deferred<_Val, _Error> &rhs) = delete;
deferred(deferred<_Val, _Error> &&rhs);
deferred<_Val, _Error>& operator= (const deferred<_Val, _Error> &rhs) = delete;
deferred<_Val, _Error>& operator= (deferred<_Val> &&rhs) = delete;
// Add a handler to be called when the deferred is resolved.
// It's an error to add a watcher to a deferred that already has
// one.
template <typename _F>
deferred<_Val, _Error>& success(_F &&f);
// Add a handler to be called when the deferred boinks out.
// It's an error to add a watcher to a deferred that already has
// one.
// Returns the object for chaining
template <typename _F>
deferred<_Val, _Error>& fail(_F &&f);
// Build a new deferred that is in failed state.
// Monadic fail
template<typename... _Args>
static deferred<_Val, _Error> failed(_Args&&... v);
// Monadic return
template<typename... _Args>
static deferred<_Val, _Error> resolved(_Args&&... v);
// Monadic bind, fills in both the fail state and the success state...
// F must be take a val and return a deferred<_Ret, _Error>
template<typename _Ret, typename _F>
deferred<_Ret, _Error> then(_F &&f);
// V.otherwise(f) returns a future that resolved to either the value or V or
// the value of f() if V fails.
// Monadic bind, fills in both the `fail` state and the `success` state...
// TODO:
// template<typename _New_err>
deferred<_Val, _Error> otherwise(std::function<deferred<_Val, _Error>()>);
}; // class deferred
template<typename _Val, typename _Error>
class deferred_trigger {
private
:
#ifndef NDEBUG
bool deferred_issued_ = false;
#endif
std::shared_ptr<__deferred_state<_Val, _Error>> state_;
public
:
deferred_trigger() :
state_(new(std::nothrow) __deferred_state<_Val, _Error> ()) {}
deferred_trigger(const deferred_trigger<_Val, _Error> &rhs) :
state_(rhs.state_) {}
deferred_trigger(deferred_trigger<_Val, _Error> &&rhs) :
state_(std::move(rhs.state_)) {}
deferred<_Val, _Error> deferred() {
#ifndef NDEBUG
assert (!deferred_issued_);
deferred_issued_ = true;
#endif
return ::deferred<_Val, _Error>(state_);
}
deferred_trigger<_Val, _Error>& operator=
(const deferred_trigger<_Val, _Error> &rhs) = delete;
deferred_trigger<_Val>& operator=
(deferred_trigger<_Val, _Error> &&rhs) = delete;
template<typename... _Args>
void set_value(_Args&&... v) {
state_->set_value(std::forward<_Args>(v)...);
}
template<typename... _Args>
void set_failed(_Args&&... v) {
state_->set_failed(std::forward<_Args>(v)...);
}
bool is_filled() { return !state_->is_waiting(); }
};
//------------------------------------------------------------------------------
// Implementation
template <typename _Val>
struct state_holder {
_Val v;
template <typename _T> using cb_type = std::function<_T(_Val&&)>;
template<typename... _Args>
state_holder(_Args&&... args) : v(std::forward<_Args>(args)...) {}
template <typename _F>
typename std::result_of<_F(_Val&&)>::type run(_F &&f) {
return f(std::move(v));
}
template <typename _F, typename... _Args>
static typename std::result_of<_F(_Val&&)>::type call(_F &&f,
_Args&&... args) {
return f(_Val(std::forward<_Args>(args)...));
}
~state_holder()=default;
};
template <>
struct state_holder<void> {
template <typename _T = void> using cb_type = std::function<_T()>;
state_holder() {}
template <typename _F>
static typename std::result_of<_F()>::type run(_F &&f) { return f(); }
template <typename _F>
static typename std::result_of<_F()>::type call(_F &&f) { return f(); }
~state_holder()=default;
};
template<typename _Val, typename _Error>
class __deferred_state {
friend class deferred<_Val, _Error>;
public
:
typedef state_holder<_Val> success_state_type;
typedef typename success_state_type::template cb_type<void> success_cb_type;
typedef state_holder<_Error> failed_state_type;
typedef typename failed_state_type::template cb_type<void> failed_cb_type;
private
:
struct cbs_t {
failed_cb_type on_failed;
success_cb_type on_success;
};
union {
success_state_type val_;
failed_state_type err_;
cbs_t cbs_;
};
enum class status {
waiting, // cbs_
failed_not_fired, // err_
success_not_fired, // val_
done // -/-
} status_;
#ifndef NDEBUG
bool failed_cb_set_ = false;
bool success_cb_set_ = false;
#endif
public
:
__deferred_state(status st = status::waiting) : status_(st),
cbs_{nullptr, nullptr}{}
__deferred_state(const __deferred_state& rhs) = delete;
__deferred_state& operator= (const __deferred_state&) = delete;
~__deferred_state();
template<typename... _Args>
void set_value(_Args&&... v);
template<typename... _Args>
void set_failed(_Args&&... v);
template<typename _F>
void fail(_F &&);
template<typename _F>
void success(_F &&);
bool is_waiting() { return status_ == status::waiting; }
};
//------------------------------------------------------------------------------
template<typename _Val, typename _Error>
deferred<_Val, _Error>::deferred(deferred<_Val, _Error> &&rhs) :
state_(std::move(rhs.state_)) {}
template<typename _Val, typename _Error>
template<typename _F>
deferred<_Val, _Error>& deferred<_Val, _Error>::success(_F &&f) {
state_->success(std::forward<_F>(f));
return *this;
}
template<typename _Val, typename _Error>
template<typename _F>
deferred<_Val, _Error>& deferred<_Val, _Error>::fail(_F &&f) {
state_->fail(std::forward<_F>(f));
return *this;
}
template<typename _Val, typename _Error>
template<typename... _Args>
deferred<_Val, _Error> deferred<_Val, _Error>::failed(_Args&&... v) {
std::shared_ptr<__deferred_state<_Val, _Error> > s
(new(std::nothrow) __deferred_state<_Val, _Error>);
s->set_failed(std::forward<_Args>(v)...);
return deferred<_Val, _Error>(s);
}
template<typename _Val, typename _Error>
template<typename... _Args>
deferred<_Val, _Error> deferred<_Val, _Error>::resolved(_Args&&... v) {
std::shared_ptr<__deferred_state<_Val, _Error> > s
(new(std::nothrow) __deferred_state<_Val, _Error>);
s->set_value(std::forward<_Args>(v)...);
return deferred<_Val, _Error>(s);
}
template<typename _Error, typename _V>
void _chain_success(deferred<_V, _Error> &src,
deferred_trigger<_V, _Error> dst) {
src.success([dst](_V&& r) mutable {
dst.set_value(std::move(r));
});
}
template<typename _Error>
void _chain_success (deferred<void, _Error> &src,
deferred_trigger<void, _Error> dst) {
src.success([dst]() mutable {
dst.set_value();
});
}
template<typename _V>
void _chain_fail(deferred<_V> &src, deferred_trigger<_V> dst) {
src.fail([dst]() mutable {
dst.set_failed();
});
}
template<typename _V, typename _Error>
void _chain_fail(deferred<_V, _Error> &src, deferred_trigger<_V, _Error> dst) {
src.fail([dst](_Error&& e) mutable {
dst.set_failed(std::move(e));
});
}
template<typename _V, typename _Error>
void _chain(deferred<_V, _Error> &&src, deferred_trigger<_V, _Error> dst) {
_chain_success<_Error>(src, dst);
_chain_fail<_V>(src, dst);
}
template<typename _Error, typename _Ret, typename _Val, typename _F>
typename std::enable_if<!std::is_void<_Val>::value, deferred<_Ret>>::type
_then(deferred<_Val, _Error> &v, _F f) {
deferred_trigger<_Ret> trig;
deferred<_Ret> res = trig.deferred();
v.fail([trig] () mutable {
trig.set_failed();
}).success([trig, f](_Val&& v) mutable {
_chain<_Ret, _Error>(f(std::move(v)), trig);
});
return res;
}
template<typename _Error, typename _Ret, typename _F>
deferred<_Ret> _then(deferred<void> &v, _F &&f) {
deferred_trigger<_Ret> trig;
deferred<_Ret> res = trig.deferred();
v.fail([trig] () mutable {
trig.set_failed();
}).success([trig, f]() mutable {
_chain<void, _Error>(f(), trig);
});
return res;
}
template<typename _Val, typename _Error>
template<typename _Ret, typename _F>
deferred<_Ret, _Error> deferred<_Val, _Error>::then(_F &&f) {
return _then<_Error, _Ret>(*this, std::forward<_F>(f));
}
template <typename _Val>
deferred<_Val> _otherwise (deferred<_Val> &v,
std::function<deferred<_Val> ()> f) {
deferred_trigger<_Val> trig;
deferred<_Val> res = trig.deferred();
_chain_success(v, trig);
v.fail([trig, f] () mutable {
_chain<_Val>(f(), trig);
});
return res;
}
template <typename _Val, typename _Error>
deferred<_Val, _Error> _otherwise
(deferred<_Val, _Error> &v,
std::function<deferred<_Val, _Error> ()> f) {
deferred_trigger<_Val, _Error> trig;
deferred<_Val, _Error> res = trig.deferred();
_chain_success(v, trig);
v.fail([trig, f] (_Error) mutable {
_chain<_Val, _Error>(f(), trig);
});
return res;
}
template <typename _Val, typename _Error>
deferred<_Val, _Error> deferred<_Val, _Error>::otherwise
(std::function<deferred<_Val, _Error> ()> f) {
return _otherwise(*this, f);
}
//----------------------------------------------------------------------------
// __deferred_state
template<typename _Val, typename _Error>
template<typename... _Args>
void __deferred_state<_Val, _Error>::set_value(_Args&&... v) {
assert(status_ == status::waiting);
if (cbs_.on_success) {
status_ = status::done;
success_state_type::call(cbs_.on_success, std::forward<_Args>(v)...);
cbs_.~cbs_t();
} else {
cbs_.~cbs_t();
new (&val_) success_state_type(std::forward<_Args>(v)...);
status_ = status::success_not_fired;
}
}
template<typename _Val, typename _Error>
template<typename... _Args>
void __deferred_state<_Val, _Error>::set_failed(_Args&&... v) {
assert(status_ == status::waiting);
if (cbs_.on_failed) {
status_ = status::done;
failed_state_type::call(cbs_.on_failed, std::forward<_Args>(v)...);
cbs_.~cbs_t();
} else {
cbs_.~cbs_t();
new (&val_) failed_state_type(std::forward<_Args>(v)...);
status_ = status::failed_not_fired;
}
}
template<typename _Val, typename _Error>
template<typename _F>
void __deferred_state<_Val, _Error>::success(_F &&f) {
#ifndef NDEBUG
assert(!success_cb_set_);
success_cb_set_ = true;
#endif
switch (status_) {
case status::waiting:
cbs_.on_success = std::forward<_F>(f);
break;
case status::success_not_fired:
val_.run(std::forward<_F>(f));
val_.~success_state_type();
status_ = status::done;
break;
case status::failed_not_fired:
case status::done:
break;
}
}
template<typename _Val, typename _Error>
template<typename _F>
void __deferred_state<_Val, _Error>::fail(_F &&f) {
#ifndef NDEBUG
assert (!failed_cb_set_);
failed_cb_set_ = true;
#endif
switch (status_) {
case status::waiting:
cbs_.on_failed = std::forward<_F>(f);
break;
case status::failed_not_fired:
err_.run(std::forward<_F>(f));
err_.~failed_state_type();
status_ = status::done;
break;
case status::success_not_fired:
case status::done:
break;
}
}
template<typename _Val, typename _Error>
__deferred_state<_Val, _Error>::~__deferred_state() {
switch (status_) {
case status::waiting:
cbs_.~cbs_t();
break;
case status::failed_not_fired:
err_.~failed_state_type();
break;
case status::success_not_fired:
val_.~success_state_type();
break;
case status::done:
break;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment