Created
January 15, 2024 20:21
-
-
Save Masstronaut/48393338de61acc17c980d807aeb5e76 to your computer and use it in GitHub Desktop.
A c++ implementation for resolving a non-cyclical graph of parallel task execution
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <utility> | |
#include <tuple> | |
#include <future> | |
#include <mutex> | |
#include <thread> | |
#include <iostream> | |
// This code is used to unpack tuple elements into function arguments. | |
// apply(f, tuple<int,bool,char>) would be unpacked as f(int,bool,char) | |
// This implementation has been specialized for tuples of futures. | |
template<size_t N> | |
struct Apply { | |
template<typename F, typename T, typename... A> | |
static inline auto apply(F & f, T & t, A &... a) { | |
return Apply<N - 1>::apply(f, t, a..., | |
::std::get<std::tuple_size<std::decay_t<T>>::value - N >(t) | |
); | |
} | |
}; | |
template<> | |
struct Apply<0> { | |
template<typename F, typename T, typename... A> | |
static inline auto apply(F & f, T &, A &... a) { | |
// This invokes the predicate with the result of the .get() method on each future in the tuple. | |
return f((a.get())...); | |
} | |
}; | |
template<typename F, typename T> | |
inline auto apply(F & f, T & t) { | |
return Apply< ::std::tuple_size< ::std::decay_t<T> | |
>::value>::apply(f, t); | |
} | |
// This code is used to generate a sequence of consecutive compile time integers | |
// The integers are used for iterating over elements of a tuple | |
template<int... Ints> | |
struct integer_sequence{}; | |
template<int N, int... Ints> | |
struct generate_integer_sequence : generate_integer_sequence<N-1, N-1, Ints...>{}; | |
template <int... Ints> | |
struct generate_integer_sequence<0, Ints...> : integer_sequence<Ints...>{}; | |
template<typename T, typename F, int... Ints> | |
void for_each_tuple_element_impl(T&& t, F f, integer_sequence<Ints...>){ | |
[](...){}((f(::std::get<Ints>(t)),0)...); | |
} | |
template<typename... Ts, typename F> | |
void for_each_tuple_element(::std::tuple<Ts...> &t, F f){ | |
for_each_tuple_element_impl(t, f, generate_integer_sequence<sizeof...(Ts)>()); | |
} | |
// this is used as a work around since task can't have 2 different parameter packs | |
// the make_task function wraps the predicate in this type to package type information with it | |
// the wrapped predicate is then passed to the task where it aids with type computations | |
template<typename ReturnType, typename... Args> | |
struct predicate_wrapper{ | |
using result_t = ReturnType; | |
using predicate_t = ReturnType(*)(Args...); | |
using packaged_task_t = std::packaged_task<result_t(Args...)>; | |
predicate_wrapper(predicate_t pred) : predicate(pred) {} | |
predicate_t predicate; | |
}; | |
// This function is used to deduce the type parameters for the predicate_wrapper struct. | |
template<typename ReturnType, typename... Args> | |
auto make_predicate_wrapper(ReturnType(*pred)(Args...)){ | |
return predicate_wrapper<ReturnType, Args...>(pred); | |
} | |
class semaphore{ | |
public: | |
semaphore(int count = 0) | |
: m_thread_count(count) | |
{} | |
semaphore& operator=(const semaphore&) = delete; | |
semaphore& operator=(const semaphore&&) = delete; | |
semaphore(const semaphore&) = delete; | |
semaphore(const semaphore&&) = delete; | |
inline void wait() { | |
std::unique_lock<decltype(m_mutex)> lock(m_mutex); | |
m_cv.wait(lock, [this]{ return m_thread_count > 0; }); | |
--m_thread_count; | |
} | |
inline void notify() { | |
std::unique_lock<decltype(m_mutex)> lock(m_mutex); | |
m_thread_count++; | |
m_cv.notify_one(); | |
} | |
private: | |
std::mutex m_mutex; | |
std::condition_variable m_cv; | |
int m_thread_count{0}; | |
}; | |
template<typename Predicate, typename... Dependencies> | |
struct task{ | |
using predicate_t = typename Predicate::predicate_t; | |
using result_t = typename Predicate::result_t; | |
using task_t = typename Predicate::packaged_task_t; | |
using dependencies_t = std::tuple<Dependencies&...>; | |
using parameters_t = std::tuple<std::future<typename std::decay_t<Dependencies>::result_t>...>; | |
task(Predicate Task, Dependencies&... dependencies) | |
: m_params(std::move(dependencies.get_future())...) | |
, m_dependencies(std::forward<Dependencies>(dependencies)...) | |
, m_task(std::move(Task.predicate)) | |
{} | |
void start(int threads = 0){ | |
if(threads){ | |
m_semaphore = new semaphore(threads); | |
} | |
// recursively start each sub-task on another thread. | |
for_each_tuple_element(m_dependencies, [this](auto& f){ | |
std::async([this, &f]{ f.start_child(m_semaphore); }); | |
}); | |
// wait until all dependent tasks are done executing | |
for_each_tuple_element(m_params, [](auto& f){ | |
f.wait(); | |
}); | |
// at this point all of the sub-tasks should've been completed | |
// it should be safe to execute this task now, | |
// but you need to make sure it's okay for another thread to execute it's work | |
m_semaphore->wait(); | |
apply(m_task, m_params); | |
m_semaphore->notify(); | |
if(threads){ | |
delete m_semaphore; | |
} | |
} | |
std::future<result_t>&& get_future(){ | |
return std::move(m_task.get_future()); | |
} | |
void start_child(semaphore* sem){ | |
m_semaphore = sem; | |
start(); | |
} | |
semaphore* m_semaphore; | |
parameters_t m_params; | |
dependencies_t m_dependencies; | |
task_t m_task; | |
}; | |
template<typename Predicate, typename... Dependencies> | |
auto make_task1(Predicate pred, Dependencies&&... dependencies){ | |
return task<Predicate, Dependencies...>(pred, std::forward<Dependencies>(dependencies)...); | |
} | |
template<typename Predicate, typename... Dependencies> | |
auto make_task(Predicate pred, Dependencies&&... dependencies){ | |
return make_task1(make_predicate_wrapper(pred), std::forward<Dependencies>(dependencies)...); | |
} | |
struct int1{ | |
int value{1}; | |
}; | |
struct int2{ | |
int value{2}; | |
}; | |
struct int3{ | |
int value{3}; | |
}; | |
struct int4{ | |
int value{4}; | |
}; | |
int1 foo1() { std::cout << "Executing 1.\n"; return int1{};} | |
int2 foo2() { std::cout << "Executing 2.\n"; return int2{};} | |
int3 foo3() { std::cout << "Executing 3.\n"; return int3{};} | |
int4 foo4() { std::cout << "Executing 4.\n"; return int4{};} | |
int bar(int1 i1, int2 i2, int3 i3, int4 i4){ std::cout << "Executing bar.\n"; return i1.value + i2.value + i3.value + i4.value; } | |
int pow_rec(int number){ return number * number; } | |
int main(){ | |
auto t1 = make_task(foo1); | |
auto t2 = make_task(foo2); | |
auto t3 = make_task(foo3); | |
auto t4 = make_task(foo4); | |
auto t5 = make_task(bar, t1, t2, t3, t4); | |
auto f = t5.get_future(); | |
t5.start(2); | |
std::cout << f.get() << std::endl; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment