Created
November 8, 2023 08:24
-
-
Save ericniebler/3c810343d153af4c0edaeb4f4751bc39 to your computer and use it in GitHub Desktop.
A demonstration of how execution domains ensure the correct sender algorithm implementation is selected
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include "stdexec/execution.hpp" | |
#include "exec/env.hpp" | |
namespace ex = stdexec; | |
using namespace std::literals; | |
template <class Sender, class Tag> | |
concept sender_for = | |
ex::sender<Sender> && std::same_as<ex::tag_of_t<Sender>, Tag>; | |
template <class From, class To> | |
concept decays_to = | |
std::same_as<std::decay_t<From>, To>; | |
template <const auto& Fun, class... Args> | |
using result_of = std::invoke_result_t<decltype(Fun), Args...>; | |
template <class From, class To> | |
using copy_cvref_t = stdexec::__copy_cvref_t<From, To>; | |
template <class Tag, class Data, class... Child> | |
auto make_sender(Tag, Data data, Child... child) { | |
return stdexec::__make_sexpr<Tag>(std::move(data), std::move(child)...); | |
} | |
////////////////////////////////////////////////////////////// | |
// a scheduler adaptor that gives it a particular domain | |
template <class Scheduler, class Domain> | |
struct with_domain { | |
explicit with_domain(Scheduler sched) | |
: sched_(sched) {} | |
bool operator==(const with_domain&) const = default; | |
private: | |
struct sender_ { | |
using is_sender = void; | |
private: | |
friend with_domain; | |
sender_(with_domain sched, ex::schedule_result_t<Scheduler> sndr) | |
: sched_(sched), sndr_(sndr) {} | |
template <class Env> | |
friend auto tag_invoke(ex::get_completion_signatures_t, sender_, Env) noexcept | |
-> ex::completion_signatures_of_t<ex::schedule_result_t<Scheduler>, Env> { | |
return {}; | |
} | |
template <class Receiver> | |
friend auto tag_invoke(ex::connect_t, sender_ self, Receiver rcvr) | |
-> ex::connect_result_t<ex::schedule_result_t<Scheduler>, Receiver> { | |
return ex::connect(self.sndr_, rcvr); | |
} | |
friend auto tag_invoke(ex::get_env_t, sender_ self) noexcept { | |
return exec::make_env( | |
ex::get_env(self.sndr_), | |
exec::make_env( | |
exec::with(ex::get_completion_scheduler<ex::set_value_t>, self.sched_), | |
exec::with(ex::get_domain, Domain()))); | |
} | |
with_domain sched_; | |
ex::schedule_result_t<Scheduler> sndr_; | |
}; | |
friend Domain tag_invoke(ex::get_domain_t, with_domain) noexcept { | |
return Domain(); | |
} | |
friend sender_ tag_invoke(ex::schedule_t, with_domain self) noexcept { | |
return {self, ex::schedule(self.sched_)}; | |
} | |
Scheduler sched_; | |
}; | |
template <class Domain, class Scheduler> | |
auto make_sched_with_domain(Scheduler sched) { | |
return with_domain<Scheduler, Domain>{sched}; | |
} | |
// The name of the current thread | |
thread_local std::string thread_name{}; | |
////////////////////////////////////////////////////////////// | |
// an execution context wrapping a run loop in a single thread | |
template <class Domain> | |
struct thread_context { | |
explicit thread_context(std::string name) | |
: thr_([this](auto name){ | |
thread_name = name; | |
loop_.run(); | |
}, name) {} | |
~thread_context() { | |
loop_.finish(); | |
thr_.join(); | |
} | |
void finish() { | |
loop_.finish(); | |
} | |
auto get_scheduler() { | |
return make_sched_with_domain<Domain>(loop_.get_scheduler()); | |
} | |
private: | |
ex::run_loop loop_; | |
std::thread thr_; | |
}; | |
// The tag used for the trace sender | |
struct trace_then_t {}; | |
template <class Receiver, class Fun> | |
struct trace_then_receiver { | |
using is_receiver = void; | |
Receiver rcvr; | |
Fun fun; | |
std::string name; | |
friend void tag_invoke(ex::set_value_t, trace_then_receiver&& self) noexcept { | |
std::cout << "then sender from domain " << self.name << '\n'; | |
self.fun(); | |
ex::set_value(std::move(self.rcvr)); | |
} | |
template <class Error> | |
friend void tag_invoke(ex::set_error_t, trace_then_receiver&& self, Error&& e) noexcept { | |
ex::set_error(std::move(self.rcvr), (Error&&) e); | |
} | |
friend void tag_invoke(ex::set_stopped_t, trace_then_receiver&& self) noexcept { | |
ex::set_stopped(std::move(self.rcvr)); | |
} | |
friend decltype(auto) tag_invoke(ex::get_env_t, const trace_then_receiver& self) noexcept { | |
return ex::get_env(self.rcvr); | |
} | |
}; | |
template <class Receiver, class Fun> | |
trace_then_receiver(Receiver, Fun, std::string) -> trace_then_receiver<Receiver, Fun>; | |
template <class Sender, class Fun> | |
struct trace_then_sender { | |
using is_sender = void; | |
trace_then_sender(Sender sndr, Fun fun, std::string name) | |
: sndr(std::move(sndr)) | |
, fun(std::move(fun)) | |
, name(std::move(name)) | |
{} | |
[[no_unique_address]] trace_then_t tag; | |
Sender sndr; | |
Fun fun; | |
std::string name; | |
friend decltype(auto) tag_invoke(ex::get_env_t, const trace_then_sender& self) noexcept { | |
return ex::get_env(self.sndr); | |
} | |
template <decays_to<trace_then_sender> Self, class Env> | |
requires ex::sender_in<copy_cvref_t<Self, Sender>, Env> | |
friend decltype(auto) tag_invoke(ex::get_completion_signatures_t, Self&& self, const Env& env) noexcept { | |
return ex::get_completion_signatures(((Self&&) self).sndr, env); | |
} | |
template <decays_to<trace_then_sender> Self, class Receiver> | |
requires ex::sender_to<copy_cvref_t<Self, Sender>, Receiver> | |
friend decltype(auto) tag_invoke(ex::connect_t, Self&& self, Receiver rcvr) { | |
std::cout << "connect then sender from domain " << self.name | |
<< " on thread " << thread_name << '\n'; | |
return ex::connect(((Self&&) self).sndr, trace_then_receiver{std::move(rcvr), self.fun, self.name}); | |
} | |
}; | |
template <class Sender, class Fun> | |
trace_then_sender(Sender, Fun, std::string) -> trace_then_sender<Sender, Fun>; | |
struct domainA { | |
template <sender_for<ex::then_t> Sender, class... Env> | |
auto transform_sender(Sender&& sndr, const Env&... env) const { | |
std::cout << "hello from domain A transform_sender, " | |
<< (sizeof...(env) ? "late" : "early") | |
<< '\n'; | |
auto [tag, fun, child] = (Sender&&) sndr; | |
return trace_then_sender(std::move(child), std::move(fun), "A"); | |
} | |
}; | |
struct domainB { | |
template <sender_for<ex::then_t> Sender, class... Env> | |
auto transform_sender(Sender&& sndr, const Env&... env) const { | |
std::cout << "hello from domain B transform_sender, " | |
<< (sizeof...(env) ? "late" : "early") | |
<< '\n'; | |
auto [tag, fun, child] = (Sender&&) sndr; | |
return trace_then_sender(std::move(child), std::move(fun), "B"); | |
} | |
}; | |
int main() { | |
thread_context<domainA> ctxA("thread A"); | |
thread_context<domainB> ctxB("thread B"); | |
auto schedA = ctxA.get_scheduler(); | |
auto schedB = ctxB.get_scheduler(); | |
auto hello = [] { std::cout << " running on thread " << thread_name << '\n'; }; | |
auto work = | |
ex::just() // no domain here | |
| ex::then(hello) // no domain here | |
| ex::transfer(schedA) // transition into domain A | |
| ex::then(hello); // "hello from domain A transform_sender, early" | |
// (using predecessor's domain via completion scheduler) | |
// work: | |
// trace("A", transfer(A, then(hello, just()))) | |
std::cout << "Early customization is finished\n"; | |
ex::sync_wait( ex::on( schedB, work ) ); | |
// <tmp>: | |
// on(B, trace("A", transfer(A, then(hello, just())))) | |
/* | |
connect(on(B)) receiver's sched = B | |
connect(trace("A")) "connect then sender from domain A" | |
connect(transfer(A)) | |
connect(then(hello)) "hello from domain B transform_sender, late" | |
(using receiver's domain via scheduler) | |
connect(trace("B")) "connect then sender from domain B" | |
connect(just()) | |
operation state: | |
on(B, trace("A", transfer(A, trace("B", just())))) | |
start(on(B)) transitions execution to thread B | |
start(trace("A")) | |
start(transfer(A)) | |
start(trace("B")) | |
start(just()) | |
set_value(trace("B")) "then sender from domain B running on thread B" | |
set_value(transfer(A)) transitions execution to thread A | |
set_value(trace("A")) "then sender from domain A running on thread A" | |
set_value(on(B)) | |
*/ | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment