Skip to content

Instantly share code, notes, and snippets.

@kirkshoop
Last active December 9, 2021 00:33
Show Gist options
  • Save kirkshoop/e2cd7e6449cb7ff5ff76dd3a801d3750 to your computer and use it in GitHub Desktop.
Save kirkshoop/e2cd7e6449cb7ff5ff76dd3a801d3750 to your computer and use it in GitHub Desktop.
bulkandmain.cpp
struct prefix_scheduler_fn {
template<typename Predecessor>
struct _sender : ex::sender_base {
Predecessor p_;
template <std::execution::receiver_of R>
friend auto tag_invoke(ex::connect_t, _sender, R&& rec){
auto prefix = p_ |
ex::let_value(
[sched = ex::get_scheduler(rec)](auto&&... vn){
return ex::just(sched, (decltype(vn)&&)vn...);
});
return ex::connect(prefix, rec);
}
};
struct sender_fn {
template<typename Predecessor>
_sender<std::remove_cvref_t<Predecessor>> operator()(Predecessor&& p) const {return {(Predecessor&&)p};}
};
sender_fn operator()() const {return {};}
};
constexpr inline prefix_scheduler_fn = prefix_scheduler{};
auto update_all(scheduler auto pool, std::vector<X> xs)
{
// this solution uses more memory to maximize parallelism
using pool_t = std::remove_cvref_t<decltype(pool)>;
using independent_result_t = decltype(independent_scan(xs[0]));
using dependent_result_t = decltype(dependent_scan(xs[0], independent_result_t[0]));
struct index_state {
std::size_t idx;
independent_result_t ops;
};
// create some state
return ex::just(std::move(xs), std::vector<index_state>{}, async_scope{}) |
// uses get_scheduler(receiver) to get the main thread scheduler from sync_wait
prefix_scheduler() |
// keep that state alive for the entire update
ex::let_value([pool](auto& main, std::vector<X>& xs, std::vector<index_state>& states, async_scope& scope){
using main_t = std::remove_cvref_t<decltype(main)>;
states.resize(xs.size());
auto independent_one = [&xs, &states](std::size_t i) {
states[i].idx = i;
states[i].ops = independent_scan(xs[i]);
};
auto bulk_independent = ex::just() |
ex::transfer(pool) |
ex::bulk(xs.size(), independent_one) |
then([](auto&&...){}); // dump any results
void dependent_one(
pool_t pool,
main_t& main,
std::vector<X>& xs,
std::vector<index_state>& states,
async_scope& scope,
std::size_t idx,
std::size_t i){
// spawn dependent work so that scan[0] is followed by privileged[0]
// which is followed by another spawn of scan[1] followed by privileged[1] ...
scope.spawn(ex::just() |
ex::transfer(pool) |
ex::then([&xs, &states, &scope, idx, i]{
return dependent_scan(xs[idx], states[idx].ops[i]);
}) |
ex::transfer(main) |
ex::then([&xs, &states, &scope, idx, i] (auto step) {
privileged_op(xs[idx], states[idx].ops[i], step);
if (i + 1 < states[idx].ops.size()) {
// spawn next
dependent_one(main, xs, states, scope, idx, i + 1);
}
}));
};
auto bulk_dependent =
ex::let_value([pool, &main, &xs, &states, &scope](){
for(auto& st : states) {
// spawn first for st.idx
dependent_one(pool, main, xs, states, scope, st.idx, 0);
}
// complete after all spawned items are complete
return scope.empty();
});
return bulk_independent | bulk_dependent;
});
}
int main() {
a_thread_pool pool;
std::vector<X> data;
// sync_wait provides a scheduler for the main thread
sync_wait(update_all(pool.get_scheduler(), std::move(data)));
}
@kirkshoop
Copy link
Author

You are correct, I misunderstood the spec :)
I have updated the gist, I think it matches your spec now - let me know.

coroutines will usually be more ergonomic. the cons are that the compilers have some issues with coroutines (tho coroutines are used in production) and not all environments have compilers that support coroutines at all and that coroutines do not allow the level of control over allocations that S/R does.

It is also possible to create a single algorithm that establishes this structure and accepts the args needed to customize independent, dependent and privileged fn

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment