Skip to content

Instantly share code, notes, and snippets.

@ericniebler
Created November 8, 2023 08:24
Show Gist options
  • Save ericniebler/3c810343d153af4c0edaeb4f4751bc39 to your computer and use it in GitHub Desktop.
Save ericniebler/3c810343d153af4c0edaeb4f4751bc39 to your computer and use it in GitHub Desktop.
A demonstration of how execution domains ensure the correct sender algorithm implementation is selected
#include <iostream>
#include "stdexec/execution.hpp"
#include "exec/env.hpp"
namespace ex = stdexec;
using namespace std::literals;
template <class Sender, class Tag>
concept sender_for =
ex::sender<Sender> && std::same_as<ex::tag_of_t<Sender>, Tag>;
template <class From, class To>
concept decays_to =
std::same_as<std::decay_t<From>, To>;
template <const auto& Fun, class... Args>
using result_of = std::invoke_result_t<decltype(Fun), Args...>;
template <class From, class To>
using copy_cvref_t = stdexec::__copy_cvref_t<From, To>;
template <class Tag, class Data, class... Child>
auto make_sender(Tag, Data data, Child... child) {
return stdexec::__make_sexpr<Tag>(std::move(data), std::move(child)...);
}
//////////////////////////////////////////////////////////////
// a scheduler adaptor that gives it a particular domain
template <class Scheduler, class Domain>
struct with_domain {
explicit with_domain(Scheduler sched)
: sched_(sched) {}
bool operator==(const with_domain&) const = default;
private:
struct sender_ {
using is_sender = void;
private:
friend with_domain;
sender_(with_domain sched, ex::schedule_result_t<Scheduler> sndr)
: sched_(sched), sndr_(sndr) {}
template <class Env>
friend auto tag_invoke(ex::get_completion_signatures_t, sender_, Env) noexcept
-> ex::completion_signatures_of_t<ex::schedule_result_t<Scheduler>, Env> {
return {};
}
template <class Receiver>
friend auto tag_invoke(ex::connect_t, sender_ self, Receiver rcvr)
-> ex::connect_result_t<ex::schedule_result_t<Scheduler>, Receiver> {
return ex::connect(self.sndr_, rcvr);
}
friend auto tag_invoke(ex::get_env_t, sender_ self) noexcept {
return exec::make_env(
ex::get_env(self.sndr_),
exec::make_env(
exec::with(ex::get_completion_scheduler<ex::set_value_t>, self.sched_),
exec::with(ex::get_domain, Domain())));
}
with_domain sched_;
ex::schedule_result_t<Scheduler> sndr_;
};
friend Domain tag_invoke(ex::get_domain_t, with_domain) noexcept {
return Domain();
}
friend sender_ tag_invoke(ex::schedule_t, with_domain self) noexcept {
return {self, ex::schedule(self.sched_)};
}
Scheduler sched_;
};
template <class Domain, class Scheduler>
auto make_sched_with_domain(Scheduler sched) {
return with_domain<Scheduler, Domain>{sched};
}
// The name of the current thread
thread_local std::string thread_name{};
//////////////////////////////////////////////////////////////
// an execution context wrapping a run loop in a single thread
template <class Domain>
struct thread_context {
explicit thread_context(std::string name)
: thr_([this](auto name){
thread_name = name;
loop_.run();
}, name) {}
~thread_context() {
loop_.finish();
thr_.join();
}
void finish() {
loop_.finish();
}
auto get_scheduler() {
return make_sched_with_domain<Domain>(loop_.get_scheduler());
}
private:
ex::run_loop loop_;
std::thread thr_;
};
// The tag used for the trace sender
struct trace_then_t {};
template <class Receiver, class Fun>
struct trace_then_receiver {
using is_receiver = void;
Receiver rcvr;
Fun fun;
std::string name;
friend void tag_invoke(ex::set_value_t, trace_then_receiver&& self) noexcept {
std::cout << "then sender from domain " << self.name << '\n';
self.fun();
ex::set_value(std::move(self.rcvr));
}
template <class Error>
friend void tag_invoke(ex::set_error_t, trace_then_receiver&& self, Error&& e) noexcept {
ex::set_error(std::move(self.rcvr), (Error&&) e);
}
friend void tag_invoke(ex::set_stopped_t, trace_then_receiver&& self) noexcept {
ex::set_stopped(std::move(self.rcvr));
}
friend decltype(auto) tag_invoke(ex::get_env_t, const trace_then_receiver& self) noexcept {
return ex::get_env(self.rcvr);
}
};
template <class Receiver, class Fun>
trace_then_receiver(Receiver, Fun, std::string) -> trace_then_receiver<Receiver, Fun>;
template <class Sender, class Fun>
struct trace_then_sender {
using is_sender = void;
trace_then_sender(Sender sndr, Fun fun, std::string name)
: sndr(std::move(sndr))
, fun(std::move(fun))
, name(std::move(name))
{}
[[no_unique_address]] trace_then_t tag;
Sender sndr;
Fun fun;
std::string name;
friend decltype(auto) tag_invoke(ex::get_env_t, const trace_then_sender& self) noexcept {
return ex::get_env(self.sndr);
}
template <decays_to<trace_then_sender> Self, class Env>
requires ex::sender_in<copy_cvref_t<Self, Sender>, Env>
friend decltype(auto) tag_invoke(ex::get_completion_signatures_t, Self&& self, const Env& env) noexcept {
return ex::get_completion_signatures(((Self&&) self).sndr, env);
}
template <decays_to<trace_then_sender> Self, class Receiver>
requires ex::sender_to<copy_cvref_t<Self, Sender>, Receiver>
friend decltype(auto) tag_invoke(ex::connect_t, Self&& self, Receiver rcvr) {
std::cout << "connect then sender from domain " << self.name
<< " on thread " << thread_name << '\n';
return ex::connect(((Self&&) self).sndr, trace_then_receiver{std::move(rcvr), self.fun, self.name});
}
};
template <class Sender, class Fun>
trace_then_sender(Sender, Fun, std::string) -> trace_then_sender<Sender, Fun>;
struct domainA {
template <sender_for<ex::then_t> Sender, class... Env>
auto transform_sender(Sender&& sndr, const Env&... env) const {
std::cout << "hello from domain A transform_sender, "
<< (sizeof...(env) ? "late" : "early")
<< '\n';
auto [tag, fun, child] = (Sender&&) sndr;
return trace_then_sender(std::move(child), std::move(fun), "A");
}
};
struct domainB {
template <sender_for<ex::then_t> Sender, class... Env>
auto transform_sender(Sender&& sndr, const Env&... env) const {
std::cout << "hello from domain B transform_sender, "
<< (sizeof...(env) ? "late" : "early")
<< '\n';
auto [tag, fun, child] = (Sender&&) sndr;
return trace_then_sender(std::move(child), std::move(fun), "B");
}
};
int main() {
thread_context<domainA> ctxA("thread A");
thread_context<domainB> ctxB("thread B");
auto schedA = ctxA.get_scheduler();
auto schedB = ctxB.get_scheduler();
auto hello = [] { std::cout << " running on thread " << thread_name << '\n'; };
auto work =
ex::just() // no domain here
| ex::then(hello) // no domain here
| ex::transfer(schedA) // transition into domain A
| ex::then(hello); // "hello from domain A transform_sender, early"
// (using predecessor's domain via completion scheduler)
// work:
// trace("A", transfer(A, then(hello, just())))
std::cout << "Early customization is finished\n";
ex::sync_wait( ex::on( schedB, work ) );
// <tmp>:
// on(B, trace("A", transfer(A, then(hello, just()))))
/*
connect(on(B)) receiver's sched = B
connect(trace("A")) "connect then sender from domain A"
connect(transfer(A))
connect(then(hello)) "hello from domain B transform_sender, late"
(using receiver's domain via scheduler)
connect(trace("B")) "connect then sender from domain B"
connect(just())
operation state:
on(B, trace("A", transfer(A, trace("B", just()))))
start(on(B)) transitions execution to thread B
start(trace("A"))
start(transfer(A))
start(trace("B"))
start(just())
set_value(trace("B")) "then sender from domain B running on thread B"
set_value(transfer(A)) transitions execution to thread A
set_value(trace("A")) "then sender from domain A running on thread A"
set_value(on(B))
*/
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment