Skip to content

Instantly share code, notes, and snippets.

@kirkshoop
Last active December 9, 2021 00:33
Show Gist options
  • Save kirkshoop/e2cd7e6449cb7ff5ff76dd3a801d3750 to your computer and use it in GitHub Desktop.
Save kirkshoop/e2cd7e6449cb7ff5ff76dd3a801d3750 to your computer and use it in GitHub Desktop.
bulkandmain.cpp
struct prefix_scheduler_fn {
template<typename Predecessor>
struct _sender : ex::sender_base {
Predecessor p_;
template <std::execution::receiver_of R>
friend auto tag_invoke(ex::connect_t, _sender, R&& rec){
auto prefix = p_ |
ex::let_value(
[sched = ex::get_scheduler(rec)](auto&&... vn){
return ex::just(sched, (decltype(vn)&&)vn...);
});
return ex::connect(prefix, rec);
}
};
struct sender_fn {
template<typename Predecessor>
_sender<std::remove_cvref_t<Predecessor>> operator()(Predecessor&& p) const {return {(Predecessor&&)p};}
};
sender_fn operator()() const {return {};}
};
constexpr inline prefix_scheduler_fn = prefix_scheduler{};
auto update_all(scheduler auto pool, std::vector<X> xs)
{
// this solution uses more memory to maximize parallelism
using pool_t = std::remove_cvref_t<decltype(pool)>;
using independent_result_t = decltype(independent_scan(xs[0]));
using dependent_result_t = decltype(dependent_scan(xs[0], independent_result_t[0]));
struct index_state {
std::size_t idx;
independent_result_t ops;
};
// create some state
return ex::just(std::move(xs), std::vector<index_state>{}, async_scope{}) |
// uses get_scheduler(receiver) to get the main thread scheduler from sync_wait
prefix_scheduler() |
// keep that state alive for the entire update
ex::let_value([pool](auto& main, std::vector<X>& xs, std::vector<index_state>& states, async_scope& scope){
using main_t = std::remove_cvref_t<decltype(main)>;
states.resize(xs.size());
auto independent_one = [&xs, &states](std::size_t i) {
states[i].idx = i;
states[i].ops = independent_scan(xs[i]);
};
auto bulk_independent = ex::just() |
ex::transfer(pool) |
ex::bulk(xs.size(), independent_one) |
then([](auto&&...){}); // dump any results
void dependent_one(
pool_t pool,
main_t& main,
std::vector<X>& xs,
std::vector<index_state>& states,
async_scope& scope,
std::size_t idx,
std::size_t i){
// spawn dependent work so that scan[0] is followed by privileged[0]
// which is followed by another spawn of scan[1] followed by privileged[1] ...
scope.spawn(ex::just() |
ex::transfer(pool) |
ex::then([&xs, &states, &scope, idx, i]{
return dependent_scan(xs[idx], states[idx].ops[i]);
}) |
ex::transfer(main) |
ex::then([&xs, &states, &scope, idx, i] (auto step) {
privileged_op(xs[idx], states[idx].ops[i], step);
if (i + 1 < states[idx].ops.size()) {
// spawn next
dependent_one(main, xs, states, scope, idx, i + 1);
}
}));
};
auto bulk_dependent =
ex::let_value([pool, &main, &xs, &states, &scope](){
for(auto& st : states) {
// spawn first for st.idx
dependent_one(pool, main, xs, states, scope, st.idx, 0);
}
// complete after all spawned items are complete
return scope.empty();
});
return bulk_independent | bulk_dependent;
});
}
int main() {
a_thread_pool pool;
std::vector<X> data;
// sync_wait provides a scheduler for the main thread
sync_wait(update_all(pool.get_scheduler(), std::move(data)));
}
@peter-b
Copy link

peter-b commented Dec 8, 2021

Thanks for going to the trouble to figure out this alternative! prefix_scheduler() looks pretty clever -- I need to spend some time tomorrow to figure out exactly how it works.

In my scenario, the important thing about each dependent_scan() is that it's dependent on the side effects of the preceding privileged_op(). So, supposing that the independent_scan() returns 3 items, then the following need to happen sequentially: dependent_scan(0), privileged_op(0), dependent_scan(1), privileged_op(1), dependent_scan(2), privileged_op(2).

From a brief reading, I think that the implementation in this gist currently tries to dispatch all of the dependent_scan() calls for an item in parallel, and that will give the wrong results -- if it could work, then it wouldn't be dependent_scan() and could be rolled into the work that independent_scan() does!

One of my concerns is that in our application we have a lot of business logic that effectively takes this structure, and I'm worried that P2300 sender/receiver will need a lot of extra scaffolding built on top of it. Do you think that using coroutines could help make this more ergonomic?

I'll send a more complete message on lib-ext tomorrow once I'm confident that I've properly understood what's going on here.

@kirkshoop
Copy link
Author

You are correct, I misunderstood the spec :)
I have updated the gist, I think it matches your spec now - let me know.

coroutines will usually be more ergonomic. the cons are that the compilers have some issues with coroutines (tho coroutines are used in production) and not all environments have compilers that support coroutines at all and that coroutines do not allow the level of control over allocations that S/R does.

It is also possible to create a single algorithm that establishes this structure and accepts the args needed to customize independent, dependent and privileged fn

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment