This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <functional> | |
#include <exception> | |
#include <optional> | |
#include <memory> | |
#include <cassert> | |
#include <atomic> | |
#include <iostream> | |
#include <mutex> | |
#include <condition_variable> | |
#include <thread> | |
#include <boost/smart_ptr/intrusive_ref_counter.hpp> | |
#include <boost/intrusive_ptr.hpp> | |
#include <boost/optional.hpp> | |
// Skip to around line 100. Everything up here is crappy shims | |
// for types in our codebase, and macro shorthands. | |
namespace future_details { | |
struct FutureRefCountable : boost::intrusive_ref_counter<FutureRefCountable>{ | |
virtual ~FutureRefCountable(); | |
}; | |
} | |
namespace stdx = std; | |
#define FWD(x) std::forward<decltype(x)>(x) | |
#define MV(x) std::move(x) | |
#define MONGO_STATIC_ASSERT(x) static_assert(x, #x) | |
#define MONGO_WARN_UNUSED_RESULT_CLASS [[nodiscard]] | |
#define NOINLINE_DECL [[gnu::noinline]] | |
#define MONGO_unlikely(x) (x) | |
#define invariant(x) assert(x) | |
using SpinLock = std::mutex; | |
template<typename T> | |
using unique_function = std::function<T>; | |
void sleepmillis(int); | |
struct Seconds{Seconds(int);}; | |
struct Date_t{ | |
Date_t operator +(Seconds); | |
bool operator <= (Date_t); | |
static Date_t now(); | |
}; | |
enum class ErrorCodes { ExceededTimeLimit }; | |
struct Status; | |
struct DBException { | |
Status toStatus() const; | |
}; | |
struct Status { | |
bool _isOK = true; | |
Status() = default; | |
template <typename T> | |
Status(T&&...) :Status(){} | |
bool isOK() const { return _isOK; } | |
}; | |
template <typename T> | |
struct StatusWith { | |
using value_type = T; | |
Status status; | |
boost::optional<T> val; | |
StatusWith(Status s) :status(MV(s)) {} | |
StatusWith(T val) :val(MV(val)) {} | |
bool isOK() const { return status.isOK(); } | |
const Status& getStatus() const {return status;} | |
Status& getStatus() {return status;} | |
const T& getValue() const {return *val;} | |
T& getValue() {return *val;} | |
}; | |
template <typename T> | |
constexpr bool isStatusOrStatusWith = false; | |
template <typename T> | |
constexpr bool isStatusOrStatusWith<StatusWith<T>> = true; | |
template<> | |
constexpr bool isStatusOrStatusWith<Status>[[maybe_unused]] = false; | |
template <typename T> | |
inline T uassertStatusOK(StatusWith<T>&& sw) { | |
if (!sw.isOK()) { | |
throw "placeholder for real exception"; | |
} | |
return MV(sw.getValue()); | |
} | |
template <typename T> | |
auto statusize(T&& val) { | |
return StatusWith<std::decay_t<T>>(FWD(val)); | |
} | |
template <typename T> | |
auto statusize(StatusWith<T> val) { | |
return std::move(val); | |
} | |
////////////////// | |
// SKIP TO HERE // | |
////////////////// | |
struct NEW { // Using struct rather than namespace to avoid having to forward declare most things. | |
struct FakeVoid {}; // So I don't have to work around irregular void | |
struct ScopedCancellationHandler; | |
struct CancellationToken : future_details::FutureRefCountable { | |
using Ptr = boost::intrusive_ptr<CancellationToken>; | |
enum State { kInit = 0b00, kWaiting = 0b01, kCanceling = 0b10, kCanceled = 0b11 }; | |
std::atomic<State> state{kInit}; | |
ScopedCancellationHandler* handler = nullptr; | |
bool isCanceled() const { | |
return state.load(std::memory_order_relaxed) >= kCanceling; | |
} | |
void cancel() { | |
auto oldState = state.exchange(kCanceling); | |
if (oldState == kWaiting) | |
handler->callback(); | |
state.store(kCanceled, std::memory_order_relaxed); | |
} | |
}; | |
struct ScopedCancellationHandler { | |
CancellationToken::Ptr token; | |
unique_function<void()> callback; | |
ScopedCancellationHandler() = default; | |
template <typename F> | |
explicit ScopedCancellationHandler(CancellationToken* token, F&& callback) | |
: token(token), callback(FWD(callback)) { | |
if (token) { | |
token->handler = this; | |
auto oldState = CancellationToken::kInit; | |
if (MONGO_unlikely(!token->state.compare_exchange_strong( | |
oldState, CancellationToken::kWaiting)) && | |
oldState == CancellationToken::kCanceled) { | |
token->handler->callback(); | |
token = nullptr; | |
} | |
} | |
} | |
~ScopedCancellationHandler() { | |
if (token) { | |
auto oldState = CancellationToken::kWaiting; | |
if (!token->state.compare_exchange_strong(oldState, CancellationToken::kInit)) { | |
invariant(oldState == CancellationToken::kCanceling || | |
oldState == CancellationToken::kCanceled); | |
while ((oldState = token->state.load(std::memory_order_acquire)) == | |
CancellationToken::kCanceling) { | |
// SPIN! (until it is safe to destoy callback) | |
} | |
invariant(oldState == CancellationToken::kCanceled); | |
} | |
} | |
} | |
}; | |
template <typename T, typename F> | |
struct GenericPromise { | |
F completer; | |
void complete(bool isRunningInline, StatusWith<T>&& res) noexcept { | |
completer(isRunningInline, std::move(res)); | |
} | |
}; | |
template <typename T, typename S> | |
struct MONGO_WARN_UNUSED_RESULT_CLASS GenericFuture { | |
MONGO_STATIC_ASSERT(!isStatusOrStatusWith<T>); | |
using value_type = T; | |
S submitter; | |
explicit GenericFuture(S&& s) : submitter(s) {} | |
template <typename Pt> | |
void submit(CancellationToken* ct, bool isRunningInline, Pt&& p) && noexcept { | |
submitter(ct, isRunningInline, FWD(p)); | |
} | |
template <typename Pt> | |
void submit(CancellationToken* ct, Pt&& p) && noexcept = delete; | |
template <typename Pt> | |
void submit(bool isRunningInline, Pt&& p) && noexcept { | |
submitter(nullptr, isRunningInline, FWD(p)); | |
} | |
template <typename U, typename F> | |
// lowLevelChain((CT*, bool isRunningInline, P<U>, SW<T>) -> void)) -> Fut<U> | |
auto lowLevelChain(F&& f) && noexcept { | |
return makeGF<U>([ f = FWD(f), submitter = MV(submitter) ]( | |
CancellationToken * ct, bool isRunningInline, auto&& p) mutable { | |
if (MONGO_unlikely(ct && ct->isCanceled())) | |
return; | |
submitter(ct, | |
isRunningInline, | |
makeGP<T>([ f = MV(f), p = FWD(p), ct = CancellationToken::Ptr(ct) ]( | |
bool isRunningInline, auto&& res) mutable { | |
if (MONGO_unlikely(ct && ct->isCanceled())) | |
return; | |
f(ct.get(), isRunningInline, std::move(p), std::move(res)); | |
})); | |
}); | |
} | |
template <typename F> | |
auto map(F&& f) && noexcept { // map(SW<T> -> U|SW<U>) -> Fut<U> | |
using U = typename std::result_of_t<F && (StatusWith<T> &&)>::value_type; | |
return std::move(*this).template lowLevelChain<U>([f = FWD(f)]( | |
CancellationToken * ct, | |
bool isRunningInline, | |
auto&& p, | |
StatusWith<T>&& res) mutable { | |
try { | |
p.complete(isRunningInline, f(FWD(res))); | |
} catch (DBException& ex) { | |
p.complete(isRunningInline, ex.toStatus()); | |
} | |
}); | |
} | |
template <typename F> | |
auto bind(F&& f) && noexcept { // bind(SW<T> -> Fut<U>) -> Fut<U> | |
using U = typename std::result_of_t<F && (StatusWith<T> &&)>::value_type; | |
return std::move(*this).template lowLevelChain<U>([f = FWD(f)]( | |
CancellationToken * ct, bool isRunningInline, auto&& p, auto&& res) mutable { | |
try { | |
f(FWD(res)).submit(ct, isRunningInline, FWD(p)); | |
} catch (DBException& ex) { | |
p.complete(isRunningInline, ex.toStatus()); | |
} | |
}); | |
} | |
template <typename F> | |
// TODO also suppot bind-like then overload | |
auto then(F&& f) && noexcept { // then(T -> U|SW<U>) -> Fut<U> | |
using SWU = decltype(statusize(f(std::declval<T>()))); | |
return std::move(*this).map([f = FWD(f)](auto&& sw) mutable { | |
if (sw.isOK()) | |
return SWU(f(MV(sw.getValue()))); | |
return SWU(MV(sw.getStatus())); | |
}); | |
} | |
/// Generic chaining/fluent helper | |
template <typename F, typename... Args> | |
auto chain(F&& f, Args&&... args) && noexcept { | |
return f(MV(*this), FWD(args)...); | |
} | |
auto typeErase() { | |
return Future<T>{std::move(submitter)}; | |
} | |
// | |
// get-like functions: end the future chain and either return a value or call a callback. | |
// | |
T get() && { | |
boost::optional<StatusWith<T>> out; | |
// TODO C++20 - Replace the wakeup jank with P0514-style waiting on done. | |
std::atomic<bool> done{false}; | |
SpinLock mx; | |
boost::optional<std::condition_variable_any> cv; | |
std::move(*this).submit(true, makeGP<T>([&](bool isRunningInline, auto&& res) mutable { | |
out.emplace(FWD(res)); | |
done.store(true, std::memory_order_release); | |
if (isRunningInline) | |
return; | |
std::lock_guard<SpinLock> lk(mx); | |
if (!cv) | |
return; | |
cv->notify_one(); | |
})); | |
if (!done.load(std::memory_order_acquire)) { | |
std::unique_lock<SpinLock> lk(mx); | |
if (!done.load(std::memory_order_acquire)) { | |
cv.emplace(); | |
cv->wait(lk, [&] { return done.load(std::memory_order_acquire); }); | |
} | |
} | |
return uassertStatusOK(std::move(*out)); | |
} | |
template <typename F> | |
void getAsync(F&& f) && noexcept { | |
std::move(*this).getAsync(nullptr, FWD(f)); | |
} | |
template <typename F> | |
void getAsync(CancellationToken* ct, F&& f) && noexcept { | |
std::move(*this).submit( | |
ct, true, makeGP<T>([f = FWD(f)](bool isRunningInline, auto&& res) { | |
f(FWD(res)); | |
})); | |
} | |
// | |
// Executor support | |
// | |
template <typename F, typename Executor> | |
void getAsyncOn(Executor* exec, CancellationToken* ct, F&& f) && noexcept { | |
exec->switchTo().getAsync(ct, [ ct, self = MV(*this), f = FWD(f) ]() mutable { | |
MV(self).getAsync(ct, MV(f)); | |
}); | |
} | |
template <typename Executor, typename... ExtraArgs> | |
auto switchTo(Executor* exec, ExtraArgs&&... extraArgs) && noexcept { | |
return exec->switchTo(this->typeErase(), FWD(extraArgs)...); | |
} | |
template <typename Executor, typename... ExtraArgs> | |
auto switchToIfRunningInline(Executor* exec, ExtraArgs&&... extraArgs) && noexcept { | |
return std::move(*this).lowLevelChain([exec, extraArgs...]( | |
CancellationToken* ct, bool isRunningInline, auto&& p, auto&& res) mutable { | |
if (isRunningInline) { | |
return p.complete(MV(res)); | |
} | |
exec->switchTo(makeReadyFuture(MV(res)), MV(extraArgs)...).submit(ct, p); | |
}); | |
} | |
}; | |
// | |
// Type Erased Promise and Future Types | |
// | |
template <typename T> | |
struct Promise { | |
using Func = unique_function<void(bool, StatusWith<T>)>; | |
Func completer; | |
explicit Promise(Func&& completer) : completer(FWD(completer)) {} | |
template <typename F> | |
/*implict*/ Promise(GenericPromise<T, F>&& gp) : completer(std::move(gp.completer)) {} | |
void complete(bool isRunningInline, StatusWith<T>&& res) noexcept { | |
completer(isRunningInline, std::move(res)); | |
} | |
}; | |
template <typename T> | |
struct MONGO_WARN_UNUSED_RESULT_CLASS Future | |
: GenericFuture<T, unique_function<void(CancellationToken*, bool, Promise<T>&&)>> { | |
using GenericFuture<T, unique_function<void(CancellationToken*, bool, Promise<T>&&)>>:: | |
GenericFuture; | |
template <typename S> | |
/*implicit*/ Future(GenericFuture<T, S>&& f) | |
: GenericFuture<T, unique_function<void(CancellationToken*, bool, Promise<T> &&)>>( | |
f.typeErase()){}; | |
}; | |
// | |
// Makers | |
// | |
template <typename T, typename S> | |
static auto makeGP(S&& s) { | |
return GenericPromise<T, std::decay_t<S>>{FWD(s)}; | |
} | |
template <typename T, typename S> | |
static auto makeGF(S&& s) { | |
return GenericFuture<T, std::decay_t<S>>{FWD(s)}; | |
} | |
// Evaluates f() at submit time. | |
template <typename F> | |
static auto lazyPromise(F&& f) { | |
return makeGF<decltype(f())>([f = FWD(f)]( | |
CancellationToken * ct, bool onThread, auto&& p) mutable { | |
if (MONGO_unlikely(ct && ct->isCanceled())) | |
return; | |
try { | |
p.complete(onThread, f()); | |
} catch (DBException& ex) { | |
p.complete(onThread, ex.toStatus()); | |
} | |
}); | |
} | |
template <typename T> | |
static auto makeReadyFuture(StatusWith<T> val) { | |
return makeGF<std::decay_t<T>>([val = FWD(val)]( | |
CancellationToken * ct, bool onThread, auto&& p) mutable { | |
if (MONGO_unlikely(ct && ct->isCanceled())) | |
return; | |
p.complete(onThread, std::move(val)); | |
}); | |
} | |
template <typename T> | |
static auto makeReadyFuture(T&& val) { | |
return makeGF<std::decay_t<T>>([val = FWD(val)]( | |
CancellationToken * ct, bool onThread, auto&& p) mutable { | |
if (MONGO_unlikely(ct && ct->isCanceled())) | |
return; | |
p.complete(onThread, std::move(val)); | |
}); | |
} | |
// | |
// Executors | |
// | |
struct Executor { | |
virtual ~Executor() = default; | |
virtual Future<FakeVoid> switchTo() noexcept = 0; | |
}; | |
struct ThenExecutor : Executor { | |
using Executor::switchTo; | |
virtual Future<FakeVoid> switchTo(Future<FakeVoid>&& when) noexcept = 0; | |
template <typename T> | |
auto switchTo(T&& when) noexcept { | |
return FWD(when).bind([this](auto&& sw) { | |
return this->switchTo().then([sw = FWD(sw)](FakeVoid) mutable { return MV(sw); }); | |
}); | |
} | |
}; | |
struct InlineExecutor final : ThenExecutor { | |
using ThenExecutor::switchTo; | |
Future<FakeVoid> switchTo() noexcept override { | |
return makeReadyFuture(FakeVoid()); | |
} | |
Future<FakeVoid> switchTo(Future<FakeVoid>&& when) noexcept override { | |
return std::move(when); | |
} | |
}; | |
struct NewThreadExecutor final : ThenExecutor { | |
using ThenExecutor::switchTo; | |
Future<FakeVoid> switchTo() noexcept override { | |
return makeGF<FakeVoid>( | |
[](CancellationToken* ct, bool isRunningInline, auto&& promise) mutable { | |
stdx::thread([promise = std::move(promise)]() mutable { | |
promise.complete(false, FakeVoid()); | |
}).detach(); | |
}); | |
} | |
Future<FakeVoid> switchTo(Future<FakeVoid>&& when) noexcept override { | |
return std::move(when).bind([this](StatusWith<FakeVoid>&& arg) { | |
return switchTo().then([arg = MV(arg)](auto&& ignore) { return std::move(arg); }); | |
}); | |
} | |
}; | |
// | |
// Deadline support | |
// | |
static Future<FakeVoid> sleepUntil(Date_t deadline) { // Date_t is our timepoint | |
// Shim models a verrrry late timeout callback. Real impl would hook up cancellation to kill | |
// the timer. | |
return makeGF<FakeVoid>([](auto&&...) {}); | |
} | |
template <typename T> | |
static Future<T> addDeadline(Future<T>&& input, Date_t deadline) noexcept { | |
// if (input.isReady()) return MV(input); | |
return makeGF<T>([ input = MV(input), deadline ]( | |
CancellationToken * ct, bool isRunningInline, auto&& promise) mutable { | |
if (ct && ct->isCanceled()) | |
return; | |
if (deadline <= Date_t::now()) | |
return promise.complete(isRunningInline, | |
Status(ErrorCodes::ExceededTimeLimit, "Timed out!")); | |
using Promise = std::decay_t<decltype(promise)>; | |
struct TimeoutState { | |
TimeoutState(CancellationToken* ct, Promise&& promise) | |
: outPromise(FWD(promise)), cancelHandler(ct, [this] { cancelAll(); }) {} | |
void cancelAll() { | |
timerCancel->cancel(); | |
inputCancel->cancel(); | |
} | |
std::atomic<bool> done{false}; | |
Promise outPromise; | |
CancellationToken::Ptr timerCancel{new CancellationToken()}; | |
CancellationToken::Ptr inputCancel{new CancellationToken()}; | |
ScopedCancellationHandler cancelHandler; | |
}; | |
auto state = std::make_shared<TimeoutState>(ct, std::move(promise)); | |
bool finishedInline = false; | |
std::move(input).submit( | |
state->inputCancel.get(), | |
/*isRunningInline*/ true, | |
makeGP<T>([ state, wasRunningInline = isRunningInline, &finishedInline ]( | |
bool isRunningInline, StatusWith<T>&& result) { | |
if (isRunningInline) { | |
finishedInline = true; | |
} else { | |
if (state->done.exchange(true)) | |
return; // timeout finished first. | |
state->timerCancel->cancel(); | |
} | |
state->outPromise.complete(isRunningInline && wasRunningInline, | |
std::move(result)); | |
})); | |
if (finishedInline) | |
return; | |
sleepUntil(deadline).submit( | |
state->timerCancel.get(), | |
isRunningInline, | |
makeGP<FakeVoid>([state](bool isRunningInline, StatusWith<FakeVoid>&& ignored) { | |
if (state->done.exchange(true)) | |
return; // input finished first | |
state->inputCancel->cancel(); | |
return state->outPromise.complete( | |
isRunningInline, Status(ErrorCodes::ExceededTimeLimit, "Timed out!")); | |
})); | |
}); | |
} | |
}; | |
int test(int arg) { | |
int x; | |
auto fut = NEW::lazyPromise([&] { return x; }) | |
.then([](auto&& x) { // | |
return x + 1; | |
}) | |
.bind([&](auto&& y) { | |
return NEW::lazyPromise([ z = y.getValue(), &x ] { return z + x; }); | |
}); | |
x = arg; | |
return std::move(fut).get(); | |
} | |
int test2(int arg) { | |
NEW::NewThreadExecutor exec; | |
return NEW::makeReadyFuture(arg) | |
.switchTo(&exec) | |
.then([](auto&& x) { return x + 1; }) | |
.bind([&](auto&& y) { | |
return NEW::lazyPromise([ z = y.getValue(), &x = arg ] { return z + x; }); | |
}) | |
.chain(NEW::addDeadline<int>, Date_t::now() + Seconds(10)) | |
.get(); | |
} | |
int test3(int arg) { | |
int out; | |
NEW::lazyPromise([&] { return arg; }) | |
.then([](auto&& x) { return x + 1; }) | |
.bind([&](auto&& y) { | |
return NEW::lazyPromise([ z = y.getValue(), &arg ] { return z + arg; }); | |
}) | |
.getAsync([&](StatusWith<int>&& sw) { out = uassertStatusOK(MV(sw)); }); | |
return out; | |
} | |
int testTE(int arg) { | |
int x; | |
auto fut = NEW::lazyPromise([&] { return x; }) | |
.typeErase() | |
.then([](auto&& x) { return x + 1; }) | |
.bind([&](auto&& y) { | |
return NEW::lazyPromise([ z = y.getValue(), &x ] { return z + x; }); | |
}); | |
x = arg; | |
return std::move(fut).get(); | |
} | |
int test2TE(int arg) { | |
return NEW::makeReadyFuture(arg) | |
.typeErase() | |
.then([](auto&& x) { return x + 1; }) | |
.bind([&](auto&& y) { | |
return NEW::lazyPromise([ z = y.getValue(), &x = arg ] { return z + x; }); | |
}) | |
.get(); | |
} | |
int test3TE(int arg) { | |
int out; | |
NEW::lazyPromise([&] { return arg; }) | |
.typeErase() | |
.then([](auto&& x) { return x + 1; }) | |
.bind([&](auto&& y) { | |
return NEW::lazyPromise([ z = y.getValue(), &arg ] { return z + arg; }); | |
}) | |
.getAsync([&](StatusWith<int>&& sw) { out = uassertStatusOK(MV(sw)); }); | |
return out; | |
} | |
template <typename Func, typename Result = std::result_of_t<Func && ()>> | |
auto slowAsync(Func&& func) { | |
return NEW::makeGF<Result>([func = FWD(func)]( | |
NEW::CancellationToken * ct, bool isRunningInline, auto&& promise) mutable { | |
stdx::thread([ promise = std::move(promise), func = std::move(func) ]() mutable { | |
std::cout << "starting thread " << std::this_thread::get_id() << std::endl; | |
try { | |
sleepmillis(100); | |
promise.complete(false, func()); | |
} catch (const DBException& ex) { | |
promise.complete(false, ex.toStatus()); | |
} | |
std::cout << "ending thread " << std::this_thread::get_id() << std::endl; | |
}).detach(); | |
}); | |
} | |
NOINLINE_DECL auto passThePotato() { | |
volatile int tries = 10; | |
std::function<NEW::Future<int>()> read = [&] { | |
return slowAsync([&]() { return --tries; }).bind([&](auto&& sw) -> NEW::Future<int> { | |
std::cout << "val: " << sw.getValue() << std::endl; | |
if (sw.getValue() == 0) { | |
sleepmillis(1000); | |
return NEW::makeReadyFuture(tries); | |
} | |
return read(); | |
}); | |
}; | |
return read().then([](int x) { return x + 0.5; }).get(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment