Skip to content

Instantly share code, notes, and snippets.

@badair
Last active July 12, 2017 01:42
Show Gist options
  • Save badair/8118b98104df49d40a410cda19495166 to your computer and use it in GitHub Desktop.
Save badair/8118b98104df49d40a410cda19495166 to your computer and use it in GitHub Desktop.
flow graph drawn at compile-time
/*<-
Copyright Barrett Adair 2016-2017
All rights reserved
->*/
// need to add support for per-node concurrecy limits
// need to add support for multifunction nodes by using
// boost::callable_traits to inspect the 2nd parameter type
// add support for a variant dispatch node
// and other kinds of nodes
#include <type_traits>
#include <tuple>
#include <functional>
#include <utility>
//#include <boost/callable_traits.hpp>
#include <boost/hana.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/thread/thread.hpp>
namespace hana = boost::hana;
// TODO this shouldn't be necessary in Hana
template<typename T>
struct type_v {
T value;
template<typename U>
constexpr auto equal(U) const {
return std::is_same<T, U>{};
}
};
// deduction guide
template<typename T> type_v(T t) -> type_v<T>;
// todo optimize
inline auto make_adjacency_list = [](auto... edges){
return hana::fold_left(
hana::make_tuple(edges...),
hana::make_map(),
[](auto map, auto edge){
auto key_ptr = hana::first(edge.pair);
auto key = hana::type_c<typename decltype(edge)::ltype>;
auto value_ptr = hana::second(edge.pair);
if constexpr(decltype(hana::contains(hana::keys(map), key))::value) {
auto value = map[key];
auto vals = hana::second(value);
auto new_map = hana::erase_key(map, key);
return hana::insert(new_map,
hana::make_pair(key,
hana::make_pair(key_ptr,
hana::insert(vals, value_ptr))));
}
else {
return hana::insert(map,
hana::make_pair(key,
hana::make_pair(key_ptr,
hana::make_set(value_ptr))));
}
});
};
template<typename L, typename R>
struct edge {
using ltype = L;
using rtype = R;
hana::pair<type_v<L*>, type_v<R*>> pair;
template<typename T, typename U>
edge(T &&t, U &&u)
: pair(type_v{&t}, type_v{&u})
{}
};
// deduction guide
template<typename L, typename R>
edge(L &l, R &r) -> edge<L, R>;
struct thread_pool {
thread_pool(size_t threads)
: service(),
working(new decltype(working)::element_type(service))
{
while(threads--) {
g.create_thread([this]{
this->service.run();
});
}
}
template<class F>
void enqueue(F &&f){
service.post(static_cast<F&&>(f));
}
void stop() {
working.reset();
g.join_all();
service.stop();
}
~thread_pool() {
stop();
}
private:
boost::asio::io_service service;
std::unique_ptr<boost::asio::io_service::work> working;
boost::thread_group g;
};
template<typename... Edges>
struct graph {
thread_pool pool{boost::thread::hardware_concurrency()};
decltype(make_adjacency_list(std::declval<Edges>()...)) adj;
graph(Edges... edges)
: adj(make_adjacency_list(edges...))
{}
template<typename Node, typename... Args>
void put(Node &n, Args &&... args) {
if constexpr(decltype(hana::contains(hana::keys(adj), hana::type_c<Node>))::value) {
auto &node_adj = adj[hana::type_c<Node>];
auto lptr = hana::first(node_adj);
pool.enqueue([=]{
if constexpr(std::is_same_v<decltype(lptr.value->operator()(std::move(args)...)), void>) {
hana::for_each(hana::second(node_adj), [&](auto descendant){
this->put(*descendant.value);
});
}
else {
auto result = lptr.value->operator()(std::move(args)...);
hana::for_each(hana::second(node_adj), [result, this](auto descendant){
this->put(*descendant.value, std::move(result));
});
}
});
return;
}
else /*leaf node*/ {
pool.enqueue([=, &n]{
n(std::move(args)...);
});
return;
}
}
void stop() {
pool.stop();
}
};
// deduction guide
template<typename... Edges>
graph(Edges... edges) -> graph<Edges...>;
#include <iostream>
int main() {
std::atomic<int> call_count = 0;
auto node_1 = [](auto x) { return x + 0.0; };
auto node_2 = [&](auto x) { return x * 2; ++call_count; };
auto node_3 = [&](auto x) { std::cout << x << std::endl; ++call_count; };
// precondition - each node must be a unique type
auto g = graph{
edge{node_1, node_2},
edge{node_1, node_3},
edge{node_2, node_3}
};
for(int i = 0; i < 1000000; ++i) {
g.put(node_1, i);
}
g.stop();
std::cout << "finished, called " << call_count << " times\n";
}
@badair
Copy link
Author

badair commented Jul 12, 2017

Now I just need a graph-aware scheduler :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment