Skip to content

Instantly share code, notes, and snippets.

@RedBeard0531
Last active July 11, 2018 17:55
Show Gist options
  • Save RedBeard0531/1fb12c3bd9c469a8549bab6c9e22492b to your computer and use it in GitHub Desktop.
Save RedBeard0531/1fb12c3bd9c469a8549bab6c9e22492b to your computer and use it in GitHub Desktop.
#include <functional>
#include <exception>
#include <optional>
#include <memory>
#include <cassert>
#include <atomic>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/optional.hpp>
// Skip to around line 100. Everything up here is crappy shims
// for types in our codebase, and macro shorthands.
namespace future_details {
struct FutureRefCountable : boost::intrusive_ref_counter<FutureRefCountable>{
virtual ~FutureRefCountable();
};
}
namespace stdx = std;
#define FWD(x) std::forward<decltype(x)>(x)
#define MV(x) std::move(x)
#define MONGO_STATIC_ASSERT(x) static_assert(x, #x)
#define MONGO_WARN_UNUSED_RESULT_CLASS [[nodiscard]]
#define NOINLINE_DECL [[gnu::noinline]]
#define MONGO_unlikely(x) (x)
#define invariant(x) assert(x)
using SpinLock = std::mutex;
template<typename T>
using unique_function = std::function<T>;
void sleepmillis(int);
struct Seconds{Seconds(int);};
struct Date_t{
Date_t operator +(Seconds);
bool operator <= (Date_t);
static Date_t now();
};
enum class ErrorCodes { ExceededTimeLimit };
struct Status;
struct DBException {
Status toStatus() const;
};
struct Status {
bool _isOK = true;
Status() = default;
template <typename T>
Status(T&&...) :Status(){}
bool isOK() const { return _isOK; }
};
template <typename T>
struct StatusWith {
using value_type = T;
Status status;
boost::optional<T> val;
StatusWith(Status s) :status(MV(s)) {}
StatusWith(T val) :val(MV(val)) {}
bool isOK() const { return status.isOK(); }
const Status& getStatus() const {return status;}
Status& getStatus() {return status;}
const T& getValue() const {return *val;}
T& getValue() {return *val;}
};
template <typename T>
constexpr bool isStatusOrStatusWith = false;
template <typename T>
constexpr bool isStatusOrStatusWith<StatusWith<T>> = true;
template<>
constexpr bool isStatusOrStatusWith<Status>[[maybe_unused]] = false;
template <typename T>
inline T uassertStatusOK(StatusWith<T>&& sw) {
if (!sw.isOK()) {
throw "placeholder for real exception";
}
return MV(sw.getValue());
}
template <typename T>
auto statusize(T&& val) {
return StatusWith<std::decay_t<T>>(FWD(val));
}
template <typename T>
auto statusize(StatusWith<T> val) {
return std::move(val);
}
//////////////////
// SKIP TO HERE //
//////////////////
struct NEW { // Using struct rather than namespace to avoid having to forward declare most things.
struct FakeVoid {}; // So I don't have to work around irregular void
struct ScopedCancellationHandler;
struct CancellationToken : future_details::FutureRefCountable {
using Ptr = boost::intrusive_ptr<CancellationToken>;
enum State { kInit = 0b00, kWaiting = 0b01, kCanceling = 0b10, kCanceled = 0b11 };
std::atomic<State> state{kInit};
ScopedCancellationHandler* handler = nullptr;
bool isCanceled() const {
return state.load(std::memory_order_relaxed) >= kCanceling;
}
void cancel() {
auto oldState = state.exchange(kCanceling);
if (oldState == kWaiting)
handler->callback();
state.store(kCanceled, std::memory_order_relaxed);
}
};
struct ScopedCancellationHandler {
CancellationToken::Ptr token;
unique_function<void()> callback;
ScopedCancellationHandler() = default;
template <typename F>
explicit ScopedCancellationHandler(CancellationToken* token, F&& callback)
: token(token), callback(FWD(callback)) {
if (token) {
token->handler = this;
auto oldState = CancellationToken::kInit;
if (MONGO_unlikely(!token->state.compare_exchange_strong(
oldState, CancellationToken::kWaiting)) &&
oldState == CancellationToken::kCanceled) {
token->handler->callback();
token = nullptr;
}
}
}
~ScopedCancellationHandler() {
if (token) {
auto oldState = CancellationToken::kWaiting;
if (!token->state.compare_exchange_strong(oldState, CancellationToken::kInit)) {
invariant(oldState == CancellationToken::kCanceling ||
oldState == CancellationToken::kCanceled);
while ((oldState = token->state.load(std::memory_order_acquire)) ==
CancellationToken::kCanceling) {
// SPIN! (until it is safe to destoy callback)
}
invariant(oldState == CancellationToken::kCanceled);
}
}
}
};
template <typename T, typename F>
struct GenericPromise {
F completer;
void complete(bool isRunningInline, StatusWith<T>&& res) noexcept {
completer(isRunningInline, std::move(res));
}
};
template <typename T, typename S>
struct MONGO_WARN_UNUSED_RESULT_CLASS GenericFuture {
MONGO_STATIC_ASSERT(!isStatusOrStatusWith<T>);
using value_type = T;
S submitter;
explicit GenericFuture(S&& s) : submitter(s) {}
template <typename Pt>
void submit(CancellationToken* ct, bool isRunningInline, Pt&& p) && noexcept {
submitter(ct, isRunningInline, FWD(p));
}
template <typename Pt>
void submit(CancellationToken* ct, Pt&& p) && noexcept = delete;
template <typename Pt>
void submit(bool isRunningInline, Pt&& p) && noexcept {
submitter(nullptr, isRunningInline, FWD(p));
}
template <typename U, typename F>
// lowLevelChain((CT*, bool isRunningInline, P<U>, SW<T>) -> void)) -> Fut<U>
auto lowLevelChain(F&& f) && noexcept {
return makeGF<U>([ f = FWD(f), submitter = MV(submitter) ](
CancellationToken * ct, bool isRunningInline, auto&& p) mutable {
if (MONGO_unlikely(ct && ct->isCanceled()))
return;
submitter(ct,
isRunningInline,
makeGP<T>([ f = MV(f), p = FWD(p), ct = CancellationToken::Ptr(ct) ](
bool isRunningInline, auto&& res) mutable {
if (MONGO_unlikely(ct && ct->isCanceled()))
return;
f(ct.get(), isRunningInline, std::move(p), std::move(res));
}));
});
}
template <typename F>
auto map(F&& f) && noexcept { // map(SW<T> -> U|SW<U>) -> Fut<U>
using U = typename std::result_of_t<F && (StatusWith<T> &&)>::value_type;
return std::move(*this).template lowLevelChain<U>([f = FWD(f)](
CancellationToken * ct,
bool isRunningInline,
auto&& p,
StatusWith<T>&& res) mutable {
try {
p.complete(isRunningInline, f(FWD(res)));
} catch (DBException& ex) {
p.complete(isRunningInline, ex.toStatus());
}
});
}
template <typename F>
auto bind(F&& f) && noexcept { // bind(SW<T> -> Fut<U>) -> Fut<U>
using U = typename std::result_of_t<F && (StatusWith<T> &&)>::value_type;
return std::move(*this).template lowLevelChain<U>([f = FWD(f)](
CancellationToken * ct, bool isRunningInline, auto&& p, auto&& res) mutable {
try {
f(FWD(res)).submit(ct, isRunningInline, FWD(p));
} catch (DBException& ex) {
p.complete(isRunningInline, ex.toStatus());
}
});
}
template <typename F>
// TODO also suppot bind-like then overload
auto then(F&& f) && noexcept { // then(T -> U|SW<U>) -> Fut<U>
using SWU = decltype(statusize(f(std::declval<T>())));
return std::move(*this).map([f = FWD(f)](auto&& sw) mutable {
if (sw.isOK())
return SWU(f(MV(sw.getValue())));
return SWU(MV(sw.getStatus()));
});
}
/// Generic chaining/fluent helper
template <typename F, typename... Args>
auto chain(F&& f, Args&&... args) && noexcept {
return f(MV(*this), FWD(args)...);
}
auto typeErase() {
return Future<T>{std::move(submitter)};
}
//
// get-like functions: end the future chain and either return a value or call a callback.
//
T get() && {
boost::optional<StatusWith<T>> out;
// TODO C++20 - Replace the wakeup jank with P0514-style waiting on done.
std::atomic<bool> done{false};
SpinLock mx;
boost::optional<std::condition_variable_any> cv;
std::move(*this).submit(true, makeGP<T>([&](bool isRunningInline, auto&& res) mutable {
out.emplace(FWD(res));
done.store(true, std::memory_order_release);
if (isRunningInline)
return;
std::lock_guard<SpinLock> lk(mx);
if (!cv)
return;
cv->notify_one();
}));
if (!done.load(std::memory_order_acquire)) {
std::unique_lock<SpinLock> lk(mx);
if (!done.load(std::memory_order_acquire)) {
cv.emplace();
cv->wait(lk, [&] { return done.load(std::memory_order_acquire); });
}
}
return uassertStatusOK(std::move(*out));
}
template <typename F>
void getAsync(F&& f) && noexcept {
std::move(*this).getAsync(nullptr, FWD(f));
}
template <typename F>
void getAsync(CancellationToken* ct, F&& f) && noexcept {
std::move(*this).submit(
ct, true, makeGP<T>([f = FWD(f)](bool isRunningInline, auto&& res) {
f(FWD(res));
}));
}
//
// Executor support
//
template <typename F, typename Executor>
void getAsyncOn(Executor* exec, CancellationToken* ct, F&& f) && noexcept {
exec->switchTo().getAsync(ct, [ ct, self = MV(*this), f = FWD(f) ]() mutable {
MV(self).getAsync(ct, MV(f));
});
}
template <typename Executor, typename... ExtraArgs>
auto switchTo(Executor* exec, ExtraArgs&&... extraArgs) && noexcept {
return exec->switchTo(this->typeErase(), FWD(extraArgs)...);
}
template <typename Executor, typename... ExtraArgs>
auto switchToIfRunningInline(Executor* exec, ExtraArgs&&... extraArgs) && noexcept {
return std::move(*this).lowLevelChain([exec, extraArgs...](
CancellationToken* ct, bool isRunningInline, auto&& p, auto&& res) mutable {
if (isRunningInline) {
return p.complete(MV(res));
}
exec->switchTo(makeReadyFuture(MV(res)), MV(extraArgs)...).submit(ct, p);
});
}
};
//
// Type Erased Promise and Future Types
//
template <typename T>
struct Promise {
using Func = unique_function<void(bool, StatusWith<T>)>;
Func completer;
explicit Promise(Func&& completer) : completer(FWD(completer)) {}
template <typename F>
/*implict*/ Promise(GenericPromise<T, F>&& gp) : completer(std::move(gp.completer)) {}
void complete(bool isRunningInline, StatusWith<T>&& res) noexcept {
completer(isRunningInline, std::move(res));
}
};
template <typename T>
struct MONGO_WARN_UNUSED_RESULT_CLASS Future
: GenericFuture<T, unique_function<void(CancellationToken*, bool, Promise<T>&&)>> {
using GenericFuture<T, unique_function<void(CancellationToken*, bool, Promise<T>&&)>>::
GenericFuture;
template <typename S>
/*implicit*/ Future(GenericFuture<T, S>&& f)
: GenericFuture<T, unique_function<void(CancellationToken*, bool, Promise<T> &&)>>(
f.typeErase()){};
};
//
// Makers
//
template <typename T, typename S>
static auto makeGP(S&& s) {
return GenericPromise<T, std::decay_t<S>>{FWD(s)};
}
template <typename T, typename S>
static auto makeGF(S&& s) {
return GenericFuture<T, std::decay_t<S>>{FWD(s)};
}
// Evaluates f() at submit time.
template <typename F>
static auto lazyPromise(F&& f) {
return makeGF<decltype(f())>([f = FWD(f)](
CancellationToken * ct, bool onThread, auto&& p) mutable {
if (MONGO_unlikely(ct && ct->isCanceled()))
return;
try {
p.complete(onThread, f());
} catch (DBException& ex) {
p.complete(onThread, ex.toStatus());
}
});
}
template <typename T>
static auto makeReadyFuture(StatusWith<T> val) {
return makeGF<std::decay_t<T>>([val = FWD(val)](
CancellationToken * ct, bool onThread, auto&& p) mutable {
if (MONGO_unlikely(ct && ct->isCanceled()))
return;
p.complete(onThread, std::move(val));
});
}
template <typename T>
static auto makeReadyFuture(T&& val) {
return makeGF<std::decay_t<T>>([val = FWD(val)](
CancellationToken * ct, bool onThread, auto&& p) mutable {
if (MONGO_unlikely(ct && ct->isCanceled()))
return;
p.complete(onThread, std::move(val));
});
}
//
// Executors
//
struct Executor {
virtual ~Executor() = default;
virtual Future<FakeVoid> switchTo() noexcept = 0;
};
struct ThenExecutor : Executor {
using Executor::switchTo;
virtual Future<FakeVoid> switchTo(Future<FakeVoid>&& when) noexcept = 0;
template <typename T>
auto switchTo(T&& when) noexcept {
return FWD(when).bind([this](auto&& sw) {
return this->switchTo().then([sw = FWD(sw)](FakeVoid) mutable { return MV(sw); });
});
}
};
struct InlineExecutor final : ThenExecutor {
using ThenExecutor::switchTo;
Future<FakeVoid> switchTo() noexcept override {
return makeReadyFuture(FakeVoid());
}
Future<FakeVoid> switchTo(Future<FakeVoid>&& when) noexcept override {
return std::move(when);
}
};
struct NewThreadExecutor final : ThenExecutor {
using ThenExecutor::switchTo;
Future<FakeVoid> switchTo() noexcept override {
return makeGF<FakeVoid>(
[](CancellationToken* ct, bool isRunningInline, auto&& promise) mutable {
stdx::thread([promise = std::move(promise)]() mutable {
promise.complete(false, FakeVoid());
}).detach();
});
}
Future<FakeVoid> switchTo(Future<FakeVoid>&& when) noexcept override {
return std::move(when).bind([this](StatusWith<FakeVoid>&& arg) {
return switchTo().then([arg = MV(arg)](auto&& ignore) { return std::move(arg); });
});
}
};
//
// Deadline support
//
static Future<FakeVoid> sleepUntil(Date_t deadline) { // Date_t is our timepoint
// Shim models a verrrry late timeout callback. Real impl would hook up cancellation to kill
// the timer.
return makeGF<FakeVoid>([](auto&&...) {});
}
template <typename T>
static Future<T> addDeadline(Future<T>&& input, Date_t deadline) noexcept {
// if (input.isReady()) return MV(input);
return makeGF<T>([ input = MV(input), deadline ](
CancellationToken * ct, bool isRunningInline, auto&& promise) mutable {
if (ct && ct->isCanceled())
return;
if (deadline <= Date_t::now())
return promise.complete(isRunningInline,
Status(ErrorCodes::ExceededTimeLimit, "Timed out!"));
using Promise = std::decay_t<decltype(promise)>;
struct TimeoutState {
TimeoutState(CancellationToken* ct, Promise&& promise)
: outPromise(FWD(promise)), cancelHandler(ct, [this] { cancelAll(); }) {}
void cancelAll() {
timerCancel->cancel();
inputCancel->cancel();
}
std::atomic<bool> done{false};
Promise outPromise;
CancellationToken::Ptr timerCancel{new CancellationToken()};
CancellationToken::Ptr inputCancel{new CancellationToken()};
ScopedCancellationHandler cancelHandler;
};
auto state = std::make_shared<TimeoutState>(ct, std::move(promise));
bool finishedInline = false;
std::move(input).submit(
state->inputCancel.get(),
/*isRunningInline*/ true,
makeGP<T>([ state, wasRunningInline = isRunningInline, &finishedInline ](
bool isRunningInline, StatusWith<T>&& result) {
if (isRunningInline) {
finishedInline = true;
} else {
if (state->done.exchange(true))
return; // timeout finished first.
state->timerCancel->cancel();
}
state->outPromise.complete(isRunningInline && wasRunningInline,
std::move(result));
}));
if (finishedInline)
return;
sleepUntil(deadline).submit(
state->timerCancel.get(),
isRunningInline,
makeGP<FakeVoid>([state](bool isRunningInline, StatusWith<FakeVoid>&& ignored) {
if (state->done.exchange(true))
return; // input finished first
state->inputCancel->cancel();
return state->outPromise.complete(
isRunningInline, Status(ErrorCodes::ExceededTimeLimit, "Timed out!"));
}));
});
}
};
int test(int arg) {
int x;
auto fut = NEW::lazyPromise([&] { return x; })
.then([](auto&& x) { //
return x + 1;
})
.bind([&](auto&& y) {
return NEW::lazyPromise([ z = y.getValue(), &x ] { return z + x; });
});
x = arg;
return std::move(fut).get();
}
int test2(int arg) {
NEW::NewThreadExecutor exec;
return NEW::makeReadyFuture(arg)
.switchTo(&exec)
.then([](auto&& x) { return x + 1; })
.bind([&](auto&& y) {
return NEW::lazyPromise([ z = y.getValue(), &x = arg ] { return z + x; });
})
.chain(NEW::addDeadline<int>, Date_t::now() + Seconds(10))
.get();
}
int test3(int arg) {
int out;
NEW::lazyPromise([&] { return arg; })
.then([](auto&& x) { return x + 1; })
.bind([&](auto&& y) {
return NEW::lazyPromise([ z = y.getValue(), &arg ] { return z + arg; });
})
.getAsync([&](StatusWith<int>&& sw) { out = uassertStatusOK(MV(sw)); });
return out;
}
int testTE(int arg) {
int x;
auto fut = NEW::lazyPromise([&] { return x; })
.typeErase()
.then([](auto&& x) { return x + 1; })
.bind([&](auto&& y) {
return NEW::lazyPromise([ z = y.getValue(), &x ] { return z + x; });
});
x = arg;
return std::move(fut).get();
}
int test2TE(int arg) {
return NEW::makeReadyFuture(arg)
.typeErase()
.then([](auto&& x) { return x + 1; })
.bind([&](auto&& y) {
return NEW::lazyPromise([ z = y.getValue(), &x = arg ] { return z + x; });
})
.get();
}
int test3TE(int arg) {
int out;
NEW::lazyPromise([&] { return arg; })
.typeErase()
.then([](auto&& x) { return x + 1; })
.bind([&](auto&& y) {
return NEW::lazyPromise([ z = y.getValue(), &arg ] { return z + arg; });
})
.getAsync([&](StatusWith<int>&& sw) { out = uassertStatusOK(MV(sw)); });
return out;
}
template <typename Func, typename Result = std::result_of_t<Func && ()>>
auto slowAsync(Func&& func) {
return NEW::makeGF<Result>([func = FWD(func)](
NEW::CancellationToken * ct, bool isRunningInline, auto&& promise) mutable {
stdx::thread([ promise = std::move(promise), func = std::move(func) ]() mutable {
std::cout << "starting thread " << std::this_thread::get_id() << std::endl;
try {
sleepmillis(100);
promise.complete(false, func());
} catch (const DBException& ex) {
promise.complete(false, ex.toStatus());
}
std::cout << "ending thread " << std::this_thread::get_id() << std::endl;
}).detach();
});
}
NOINLINE_DECL auto passThePotato() {
volatile int tries = 10;
std::function<NEW::Future<int>()> read = [&] {
return slowAsync([&]() { return --tries; }).bind([&](auto&& sw) -> NEW::Future<int> {
std::cout << "val: " << sw.getValue() << std::endl;
if (sw.getValue() == 0) {
sleepmillis(1000);
return NEW::makeReadyFuture(tries);
}
return read();
});
};
return read().then([](int x) { return x + 0.5; }).get();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment