Last active
December 9, 2021 00:33
-
-
Save kirkshoop/e2cd7e6449cb7ff5ff76dd3a801d3750 to your computer and use it in GitHub Desktop.
bulkandmain.cpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
struct prefix_scheduler_fn { | |
template<typename Predecessor> | |
struct _sender : ex::sender_base { | |
Predecessor p_; | |
template <std::execution::receiver_of R> | |
friend auto tag_invoke(ex::connect_t, _sender, R&& rec){ | |
auto prefix = p_ | | |
ex::let_value( | |
[sched = ex::get_scheduler(rec)](auto&&... vn){ | |
return ex::just(sched, (decltype(vn)&&)vn...); | |
}); | |
return ex::connect(prefix, rec); | |
} | |
}; | |
struct sender_fn { | |
template<typename Predecessor> | |
_sender<std::remove_cvref_t<Predecessor>> operator()(Predecessor&& p) const {return {(Predecessor&&)p};} | |
}; | |
sender_fn operator()() const {return {};} | |
}; | |
constexpr inline prefix_scheduler_fn = prefix_scheduler{}; | |
auto update_all(scheduler auto pool, std::vector<X> xs) | |
{ | |
// this solution uses more memory to maximize parallelism | |
using pool_t = std::remove_cvref_t<decltype(pool)>; | |
using independent_result_t = decltype(independent_scan(xs[0])); | |
using dependent_result_t = decltype(dependent_scan(xs[0], independent_result_t[0])); | |
struct index_state { | |
std::size_t idx; | |
independent_result_t ops; | |
}; | |
// create some state | |
return ex::just(std::move(xs), std::vector<index_state>{}, async_scope{}) | | |
// uses get_scheduler(receiver) to get the main thread scheduler from sync_wait | |
prefix_scheduler() | | |
// keep that state alive for the entire update | |
ex::let_value([pool](auto& main, std::vector<X>& xs, std::vector<index_state>& states, async_scope& scope){ | |
using main_t = std::remove_cvref_t<decltype(main)>; | |
states.resize(xs.size()); | |
auto independent_one = [&xs, &states](std::size_t i) { | |
states[i].idx = i; | |
states[i].ops = independent_scan(xs[i]); | |
}; | |
auto bulk_independent = ex::just() | | |
ex::transfer(pool) | | |
ex::bulk(xs.size(), independent_one) | | |
then([](auto&&...){}); // dump any results | |
void dependent_one( | |
pool_t pool, | |
main_t& main, | |
std::vector<X>& xs, | |
std::vector<index_state>& states, | |
async_scope& scope, | |
std::size_t idx, | |
std::size_t i){ | |
// spawn dependent work so that scan[0] is followed by privileged[0] | |
// which is followed by another spawn of scan[1] followed by privileged[1] ... | |
scope.spawn(ex::just() | | |
ex::transfer(pool) | | |
ex::then([&xs, &states, &scope, idx, i]{ | |
return dependent_scan(xs[idx], states[idx].ops[i]); | |
}) | | |
ex::transfer(main) | | |
ex::then([&xs, &states, &scope, idx, i] (auto step) { | |
privileged_op(xs[idx], states[idx].ops[i], step); | |
if (i + 1 < states[idx].ops.size()) { | |
// spawn next | |
dependent_one(main, xs, states, scope, idx, i + 1); | |
} | |
})); | |
}; | |
auto bulk_dependent = | |
ex::let_value([pool, &main, &xs, &states, &scope](){ | |
for(auto& st : states) { | |
// spawn first for st.idx | |
dependent_one(pool, main, xs, states, scope, st.idx, 0); | |
} | |
// complete after all spawned items are complete | |
return scope.empty(); | |
}); | |
return bulk_independent | bulk_dependent; | |
}); | |
} | |
int main() { | |
a_thread_pool pool; | |
std::vector<X> data; | |
// sync_wait provides a scheduler for the main thread | |
sync_wait(update_all(pool.get_scheduler(), std::move(data))); | |
} |
You are correct, I misunderstood the spec :)
I have updated the gist, I think it matches your spec now - let me know.
coroutines will usually be more ergonomic. the cons are that the compilers have some issues with coroutines (tho coroutines are used in production) and not all environments have compilers that support coroutines at all and that coroutines do not allow the level of control over allocations that S/R does.
It is also possible to create a single algorithm that establishes this structure and accepts the args needed to customize independent, dependent and privileged fn
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for going to the trouble to figure out this alternative!
prefix_scheduler()
looks pretty clever -- I need to spend some time tomorrow to figure out exactly how it works.In my scenario, the important thing about each
dependent_scan()
is that it's dependent on the side effects of the precedingprivileged_op()
. So, supposing that theindependent_scan()
returns 3 items, then the following need to happen sequentially:dependent_scan(0)
,privileged_op(0)
,dependent_scan(1)
,privileged_op(1)
,dependent_scan(2)
,privileged_op(2)
.From a brief reading, I think that the implementation in this gist currently tries to dispatch all of the
dependent_scan()
calls for an item in parallel, and that will give the wrong results -- if it could work, then it wouldn't bedependent_scan()
and could be rolled into the work thatindependent_scan()
does!One of my concerns is that in our application we have a lot of business logic that effectively takes this structure, and I'm worried that P2300 sender/receiver will need a lot of extra scaffolding built on top of it. Do you think that using coroutines could help make this more ergonomic?
I'll send a more complete message on lib-ext tomorrow once I'm confident that I've properly understood what's going on here.