Created
July 1, 2014 14:20
-
-
Save till-varoquaux/3d03ebc05262662e5a54 to your computer and use it in GitHub Desktop.
A small asynchronous monad...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
/** | |
* This is a quick abstraction over non-blocking asynchronous deferreds. In a | |
* way you can see those as non-blocking futures (whereas std::future are | |
* blocking). | |
* This implementation is devoid of fanciness. Amongst other things: | |
* - Deferreds are not cancelable | |
* - They are not threadsafe. Callbacks are ran in whichever thread the | |
* deferred was triggered and there's no mutex to protect from race condition | |
* while registering callbacks/setting values. This is what we use libev | |
* for... | |
*/ | |
#include <functional> | |
#include <vector> | |
#include <cassert> | |
#include <memory> | |
template<typename _Val, typename _Error = void> | |
class deferred_trigger; | |
template<typename _Val, typename _Error = void> | |
class __deferred_state; | |
template<class _Val, typename _Error = void> | |
class deferred { | |
private | |
: | |
// Needs to be shared between the trigger and the deferred. | |
// This were all the state actually resides. It's memory allocated so it never | |
// needs to move and its lifespan is not tied to the lifespan of the deferred. | |
std::shared_ptr<__deferred_state<_Val, _Error>> state_; | |
explicit deferred(std::shared_ptr<__deferred_state<_Val, _Error>> s) : | |
state_(s) {} | |
public | |
: | |
friend class deferred_trigger<_Val, _Error>; | |
deferred() = delete; | |
deferred(const deferred<_Val, _Error> &rhs) = delete; | |
deferred(deferred<_Val, _Error> &&rhs); | |
deferred<_Val, _Error>& operator= (const deferred<_Val, _Error> &rhs) = delete; | |
deferred<_Val, _Error>& operator= (deferred<_Val> &&rhs) = delete; | |
// Add a handler to be called when the deferred is resolved. | |
// It's an error to add a watcher to a deferred that already has | |
// one. | |
template <typename _F> | |
deferred<_Val, _Error>& success(_F &&f); | |
// Add a handler to be called when the deferred boinks out. | |
// It's an error to add a watcher to a deferred that already has | |
// one. | |
// Returns the object for chaining | |
template <typename _F> | |
deferred<_Val, _Error>& fail(_F &&f); | |
// Build a new deferred that is in failed state. | |
// Monadic fail | |
template<typename... _Args> | |
static deferred<_Val, _Error> failed(_Args&&... v); | |
// Monadic return | |
template<typename... _Args> | |
static deferred<_Val, _Error> resolved(_Args&&... v); | |
// Monadic bind, fills in both the fail state and the success state... | |
// F must be take a val and return a deferred<_Ret, _Error> | |
template<typename _Ret, typename _F> | |
deferred<_Ret, _Error> then(_F &&f); | |
// V.otherwise(f) returns a future that resolved to either the value or V or | |
// the value of f() if V fails. | |
// Monadic bind, fills in both the `fail` state and the `success` state... | |
// TODO: | |
// template<typename _New_err> | |
deferred<_Val, _Error> otherwise(std::function<deferred<_Val, _Error>()>); | |
}; // class deferred | |
template<typename _Val, typename _Error> | |
class deferred_trigger { | |
private | |
: | |
#ifndef NDEBUG | |
bool deferred_issued_ = false; | |
#endif | |
std::shared_ptr<__deferred_state<_Val, _Error>> state_; | |
public | |
: | |
deferred_trigger() : | |
state_(new(std::nothrow) __deferred_state<_Val, _Error> ()) {} | |
deferred_trigger(const deferred_trigger<_Val, _Error> &rhs) : | |
state_(rhs.state_) {} | |
deferred_trigger(deferred_trigger<_Val, _Error> &&rhs) : | |
state_(std::move(rhs.state_)) {} | |
deferred<_Val, _Error> deferred() { | |
#ifndef NDEBUG | |
assert (!deferred_issued_); | |
deferred_issued_ = true; | |
#endif | |
return ::deferred<_Val, _Error>(state_); | |
} | |
deferred_trigger<_Val, _Error>& operator= | |
(const deferred_trigger<_Val, _Error> &rhs) = delete; | |
deferred_trigger<_Val>& operator= | |
(deferred_trigger<_Val, _Error> &&rhs) = delete; | |
template<typename... _Args> | |
void set_value(_Args&&... v) { | |
state_->set_value(std::forward<_Args>(v)...); | |
} | |
template<typename... _Args> | |
void set_failed(_Args&&... v) { | |
state_->set_failed(std::forward<_Args>(v)...); | |
} | |
bool is_filled() { return !state_->is_waiting(); } | |
}; | |
//------------------------------------------------------------------------------ | |
// Implementation | |
template <typename _Val> | |
struct state_holder { | |
_Val v; | |
template <typename _T> using cb_type = std::function<_T(_Val&&)>; | |
template<typename... _Args> | |
state_holder(_Args&&... args) : v(std::forward<_Args>(args)...) {} | |
template <typename _F> | |
typename std::result_of<_F(_Val&&)>::type run(_F &&f) { | |
return f(std::move(v)); | |
} | |
template <typename _F, typename... _Args> | |
static typename std::result_of<_F(_Val&&)>::type call(_F &&f, | |
_Args&&... args) { | |
return f(_Val(std::forward<_Args>(args)...)); | |
} | |
~state_holder()=default; | |
}; | |
template <> | |
struct state_holder<void> { | |
template <typename _T = void> using cb_type = std::function<_T()>; | |
state_holder() {} | |
template <typename _F> | |
static typename std::result_of<_F()>::type run(_F &&f) { return f(); } | |
template <typename _F> | |
static typename std::result_of<_F()>::type call(_F &&f) { return f(); } | |
~state_holder()=default; | |
}; | |
template<typename _Val, typename _Error> | |
class __deferred_state { | |
friend class deferred<_Val, _Error>; | |
public | |
: | |
typedef state_holder<_Val> success_state_type; | |
typedef typename success_state_type::template cb_type<void> success_cb_type; | |
typedef state_holder<_Error> failed_state_type; | |
typedef typename failed_state_type::template cb_type<void> failed_cb_type; | |
private | |
: | |
struct cbs_t { | |
failed_cb_type on_failed; | |
success_cb_type on_success; | |
}; | |
union { | |
success_state_type val_; | |
failed_state_type err_; | |
cbs_t cbs_; | |
}; | |
enum class status { | |
waiting, // cbs_ | |
failed_not_fired, // err_ | |
success_not_fired, // val_ | |
done // -/- | |
} status_; | |
#ifndef NDEBUG | |
bool failed_cb_set_ = false; | |
bool success_cb_set_ = false; | |
#endif | |
public | |
: | |
__deferred_state(status st = status::waiting) : status_(st), | |
cbs_{nullptr, nullptr}{} | |
__deferred_state(const __deferred_state& rhs) = delete; | |
__deferred_state& operator= (const __deferred_state&) = delete; | |
~__deferred_state(); | |
template<typename... _Args> | |
void set_value(_Args&&... v); | |
template<typename... _Args> | |
void set_failed(_Args&&... v); | |
template<typename _F> | |
void fail(_F &&); | |
template<typename _F> | |
void success(_F &&); | |
bool is_waiting() { return status_ == status::waiting; } | |
}; | |
//------------------------------------------------------------------------------ | |
template<typename _Val, typename _Error> | |
deferred<_Val, _Error>::deferred(deferred<_Val, _Error> &&rhs) : | |
state_(std::move(rhs.state_)) {} | |
template<typename _Val, typename _Error> | |
template<typename _F> | |
deferred<_Val, _Error>& deferred<_Val, _Error>::success(_F &&f) { | |
state_->success(std::forward<_F>(f)); | |
return *this; | |
} | |
template<typename _Val, typename _Error> | |
template<typename _F> | |
deferred<_Val, _Error>& deferred<_Val, _Error>::fail(_F &&f) { | |
state_->fail(std::forward<_F>(f)); | |
return *this; | |
} | |
template<typename _Val, typename _Error> | |
template<typename... _Args> | |
deferred<_Val, _Error> deferred<_Val, _Error>::failed(_Args&&... v) { | |
std::shared_ptr<__deferred_state<_Val, _Error> > s | |
(new(std::nothrow) __deferred_state<_Val, _Error>); | |
s->set_failed(std::forward<_Args>(v)...); | |
return deferred<_Val, _Error>(s); | |
} | |
template<typename _Val, typename _Error> | |
template<typename... _Args> | |
deferred<_Val, _Error> deferred<_Val, _Error>::resolved(_Args&&... v) { | |
std::shared_ptr<__deferred_state<_Val, _Error> > s | |
(new(std::nothrow) __deferred_state<_Val, _Error>); | |
s->set_value(std::forward<_Args>(v)...); | |
return deferred<_Val, _Error>(s); | |
} | |
template<typename _Error, typename _V> | |
void _chain_success(deferred<_V, _Error> &src, | |
deferred_trigger<_V, _Error> dst) { | |
src.success([dst](_V&& r) mutable { | |
dst.set_value(std::move(r)); | |
}); | |
} | |
template<typename _Error> | |
void _chain_success (deferred<void, _Error> &src, | |
deferred_trigger<void, _Error> dst) { | |
src.success([dst]() mutable { | |
dst.set_value(); | |
}); | |
} | |
template<typename _V> | |
void _chain_fail(deferred<_V> &src, deferred_trigger<_V> dst) { | |
src.fail([dst]() mutable { | |
dst.set_failed(); | |
}); | |
} | |
template<typename _V, typename _Error> | |
void _chain_fail(deferred<_V, _Error> &src, deferred_trigger<_V, _Error> dst) { | |
src.fail([dst](_Error&& e) mutable { | |
dst.set_failed(std::move(e)); | |
}); | |
} | |
template<typename _V, typename _Error> | |
void _chain(deferred<_V, _Error> &&src, deferred_trigger<_V, _Error> dst) { | |
_chain_success<_Error>(src, dst); | |
_chain_fail<_V>(src, dst); | |
} | |
template<typename _Error, typename _Ret, typename _Val, typename _F> | |
typename std::enable_if<!std::is_void<_Val>::value, deferred<_Ret>>::type | |
_then(deferred<_Val, _Error> &v, _F f) { | |
deferred_trigger<_Ret> trig; | |
deferred<_Ret> res = trig.deferred(); | |
v.fail([trig] () mutable { | |
trig.set_failed(); | |
}).success([trig, f](_Val&& v) mutable { | |
_chain<_Ret, _Error>(f(std::move(v)), trig); | |
}); | |
return res; | |
} | |
template<typename _Error, typename _Ret, typename _F> | |
deferred<_Ret> _then(deferred<void> &v, _F &&f) { | |
deferred_trigger<_Ret> trig; | |
deferred<_Ret> res = trig.deferred(); | |
v.fail([trig] () mutable { | |
trig.set_failed(); | |
}).success([trig, f]() mutable { | |
_chain<void, _Error>(f(), trig); | |
}); | |
return res; | |
} | |
template<typename _Val, typename _Error> | |
template<typename _Ret, typename _F> | |
deferred<_Ret, _Error> deferred<_Val, _Error>::then(_F &&f) { | |
return _then<_Error, _Ret>(*this, std::forward<_F>(f)); | |
} | |
template <typename _Val> | |
deferred<_Val> _otherwise (deferred<_Val> &v, | |
std::function<deferred<_Val> ()> f) { | |
deferred_trigger<_Val> trig; | |
deferred<_Val> res = trig.deferred(); | |
_chain_success(v, trig); | |
v.fail([trig, f] () mutable { | |
_chain<_Val>(f(), trig); | |
}); | |
return res; | |
} | |
template <typename _Val, typename _Error> | |
deferred<_Val, _Error> _otherwise | |
(deferred<_Val, _Error> &v, | |
std::function<deferred<_Val, _Error> ()> f) { | |
deferred_trigger<_Val, _Error> trig; | |
deferred<_Val, _Error> res = trig.deferred(); | |
_chain_success(v, trig); | |
v.fail([trig, f] (_Error) mutable { | |
_chain<_Val, _Error>(f(), trig); | |
}); | |
return res; | |
} | |
template <typename _Val, typename _Error> | |
deferred<_Val, _Error> deferred<_Val, _Error>::otherwise | |
(std::function<deferred<_Val, _Error> ()> f) { | |
return _otherwise(*this, f); | |
} | |
//---------------------------------------------------------------------------- | |
// __deferred_state | |
template<typename _Val, typename _Error> | |
template<typename... _Args> | |
void __deferred_state<_Val, _Error>::set_value(_Args&&... v) { | |
assert(status_ == status::waiting); | |
if (cbs_.on_success) { | |
status_ = status::done; | |
success_state_type::call(cbs_.on_success, std::forward<_Args>(v)...); | |
cbs_.~cbs_t(); | |
} else { | |
cbs_.~cbs_t(); | |
new (&val_) success_state_type(std::forward<_Args>(v)...); | |
status_ = status::success_not_fired; | |
} | |
} | |
template<typename _Val, typename _Error> | |
template<typename... _Args> | |
void __deferred_state<_Val, _Error>::set_failed(_Args&&... v) { | |
assert(status_ == status::waiting); | |
if (cbs_.on_failed) { | |
status_ = status::done; | |
failed_state_type::call(cbs_.on_failed, std::forward<_Args>(v)...); | |
cbs_.~cbs_t(); | |
} else { | |
cbs_.~cbs_t(); | |
new (&val_) failed_state_type(std::forward<_Args>(v)...); | |
status_ = status::failed_not_fired; | |
} | |
} | |
template<typename _Val, typename _Error> | |
template<typename _F> | |
void __deferred_state<_Val, _Error>::success(_F &&f) { | |
#ifndef NDEBUG | |
assert(!success_cb_set_); | |
success_cb_set_ = true; | |
#endif | |
switch (status_) { | |
case status::waiting: | |
cbs_.on_success = std::forward<_F>(f); | |
break; | |
case status::success_not_fired: | |
val_.run(std::forward<_F>(f)); | |
val_.~success_state_type(); | |
status_ = status::done; | |
break; | |
case status::failed_not_fired: | |
case status::done: | |
break; | |
} | |
} | |
template<typename _Val, typename _Error> | |
template<typename _F> | |
void __deferred_state<_Val, _Error>::fail(_F &&f) { | |
#ifndef NDEBUG | |
assert (!failed_cb_set_); | |
failed_cb_set_ = true; | |
#endif | |
switch (status_) { | |
case status::waiting: | |
cbs_.on_failed = std::forward<_F>(f); | |
break; | |
case status::failed_not_fired: | |
err_.run(std::forward<_F>(f)); | |
err_.~failed_state_type(); | |
status_ = status::done; | |
break; | |
case status::success_not_fired: | |
case status::done: | |
break; | |
} | |
} | |
template<typename _Val, typename _Error> | |
__deferred_state<_Val, _Error>::~__deferred_state() { | |
switch (status_) { | |
case status::waiting: | |
cbs_.~cbs_t(); | |
break; | |
case status::failed_not_fired: | |
err_.~failed_state_type(); | |
break; | |
case status::success_not_fired: | |
val_.~success_state_type(); | |
break; | |
case status::done: | |
break; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment