Skip to content

Instantly share code, notes, and snippets.

@Masstronaut
Created February 26, 2017 01:39
Show Gist options
  • Save Masstronaut/8b52da335dbc724c8b3ceb2accc1b248 to your computer and use it in GitHub Desktop.
Save Masstronaut/8b52da335dbc724c8b3ceb2accc1b248 to your computer and use it in GitHub Desktop.
Enables programmers to break a large, complex task into smaller dependencies which can be resolved and executed concurrently.
#include <utility>
#include <tuple>
#include <future>
#include <mutex>
#include <thread>
#include <iostream>
// This code is used to unpack tuple elements into function arguments.
// apply(f, tuple<int,bool,char>) would be unpacked as f(int,bool,char)
// This implementation has been specialized for tuples of futures.
template<size_t N>
struct Apply {
template<typename F, typename T, typename... A>
static inline auto apply(F & f, T & t, A &... a) {
return Apply<N - 1>::apply(f, t, a...,
::std::get<std::tuple_size<std::decay_t<T>>::value - N >(t)
);
}
};
template<>
struct Apply<0> {
template<typename F, typename T, typename... A>
static inline auto apply(F & f, T &, A &... a) {
// This invokes the predicate with the result of the .get() method on each future in the tuple.
return f((a.get())...);
}
};
template<typename F, typename T>
inline auto apply(F & f, T & t) {
return Apply< ::std::tuple_size< ::std::decay_t<T>
>::value>::apply(f, t);
}
// integer_sequence is used to generate a sequence of consecutive compile time integers
// The integers are used for iterating over elements of a tuple
template<int... Ints>
struct integer_sequence{};
template<int N, int... Ints>
struct generate_integer_sequence : generate_integer_sequence<N-1, N-1, Ints...>{};
template <int... Ints>
struct generate_integer_sequence<0, Ints...> : integer_sequence<Ints...>{};
template<typename T, typename F, int... Ints>
void for_each_tuple_element_impl(T&& t, F f, integer_sequence<Ints...>){
[](...){}((f(::std::get<Ints>(t)),0)...);
}
template<typename... Ts, typename F>
void for_each_tuple_element(::std::tuple<Ts...> &t, F f){
for_each_tuple_element_impl(t, f, generate_integer_sequence<sizeof...(Ts)>());
}
// this is used as a work around since task can't have 2 different parameter packs
// the make_task function wraps the predicate in this type to package type information with it
// the wrapped predicate is then passed to the task where it aids with type computations
template<typename ReturnType, typename... Args>
struct predicate_wrapper{
using result_t = ReturnType;
using predicate_t = ReturnType(*)(Args...);
using packaged_task_t = std::packaged_task<result_t(Args...)>;
predicate_wrapper(predicate_t pred) : predicate(pred) {}
predicate_t predicate;
};
// This function is used to deduce the type parameters for the predicate_wrapper struct.
template<typename ReturnType, typename... Args>
auto make_predicate_wrapper(ReturnType(*pred)(Args...)){
return predicate_wrapper<ReturnType, Args...>(pred);
}
class semaphore{
public:
semaphore(int count = 0)
: m_thread_count(count)
{}
semaphore& operator=(const semaphore&) = delete;
semaphore& operator=(semaphore&&) = delete;
semaphore(const semaphore&) = delete;
semaphore(semaphore&&) = delete;
inline void wait() {
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
m_cv.wait(lock, [this]{ return m_thread_count > 0; });
--m_thread_count;
}
inline void notify() {
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
m_thread_count++;
m_cv.notify_one();
}
private:
std::mutex m_mutex;
std::condition_variable m_cv;
int m_thread_count{0};
};
template<typename Predicate, typename... Dependencies>
struct task{
using predicate_t = typename Predicate::predicate_t;
using result_t = typename Predicate::result_t;
using task_t = typename Predicate::packaged_task_t;
using dependencies_t = std::tuple<Dependencies&...>;
using parameters_t = std::tuple<std::future<typename std::decay_t<Dependencies>::result_t>...>;
task(Predicate Task, Dependencies&... dependencies)
: m_params(std::move(dependencies.get_future())...)
, m_dependencies(std::forward<Dependencies>(dependencies)...)
, m_task(std::move(Task.predicate))
{}
void start(int threads = 0){
if(threads){
m_semaphore = new semaphore(threads);
}
// recursively start each sub-task on another thread.
for_each_tuple_element(m_dependencies, [this](auto& f){
std::async([this, &f]{ f.start_child(m_semaphore); });
});
// wait until all dependent tasks are done executing
for_each_tuple_element(m_params, [](auto& f){
f.wait();
});
// at this point all of the sub-tasks should've been completed
// it should be safe to execute this task now,
// but you need to make sure it's okay for another thread to execute it's work
m_semaphore->wait();
apply(m_task, m_params);
m_semaphore->notify();
if(threads){
delete m_semaphore;
}
}
std::future<result_t>&& get_future(){
return std::move(m_task.get_future());
}
void start_child(semaphore* sem){
m_semaphore = sem;
start();
}
semaphore* m_semaphore;
parameters_t m_params;
dependencies_t m_dependencies;
task_t m_task;
};
namespace detail {
// This inner make_task is used so that the first call can wrap the predicate in a wrapper class.
// The wrapper class exposes some information about the predicate so it is easier to work with.
template<typename Predicate, typename... Dependencies>
auto make_task_impl(Predicate pred, Dependencies&&... dependencies){
return task<Predicate, Dependencies...>(pred, std::forward<Dependencies>(dependencies)...);
}
}
template<typename Predicate, typename... Dependencies>
auto make_task(Predicate pred, Dependencies&&... dependencies){
return detail::make_task_impl(make_predicate_wrapper(pred), std::forward<Dependencies>(dependencies)...);
}
// Example usage is below
struct int1{
int value{1};
};
struct int2{
int value{2};
};
struct int3{
int value{3};
};
struct int4{
int value{4};
};
int1 foo1() { std::cout << "Executing 1.\n"; return int1{};}
int2 foo2() { std::cout << "Executing 2.\n"; return int2{};}
int3 foo3() { std::cout << "Executing 3.\n"; return int3{};}
int4 foo4() { std::cout << "Executing 4.\n"; return int4{};}
int bar(int1 i1, int2 i2, int3 i3, int4 i4){ std::cout << "Executing bar.\n"; return i1.value + i2.value + i3.value + i4.value; }
int main(){
auto t1 = make_task(foo1);
auto t2 = make_task(foo2);
auto t3 = make_task(foo3);
auto t4 = make_task(foo4);
auto t5 = make_task(bar, t1, t2, t3, t4);
auto f = t5.get_future();
const int num_threads{ 2 };
t5.start( num_threads );
std::cout << f.get() << std::endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment