Skip to content

Instantly share code, notes, and snippets.

@jamboree
Last active March 6, 2017 08:27
Show Gist options
  • Save jamboree/66cd7106b1b2338be4ea40b55316a929 to your computer and use it in GitHub Desktop.
Save jamboree/66cd7106b1b2338be4ea40b55316a929 to your computer and use it in GitHub Desktop.
#include <iostream>
#include <thread>
#include <co2/task.hpp>
#include <co2/blocking.hpp>
#include <boost/asio/io_service.hpp>
#include <deque>
#include <ppl.h>
#include <tbb/task_group.h>
struct lifo_queue
{
co2::coroutine_handle last = nullptr;
void post(co2::coroutine<>& coro)
{
auto h = coro.detach();
co2::coroutine_data(h) = last;
last = h;
}
void run()
{
while (last)
{
co2::coroutine<> coro{last};
last = static_cast<co2::coroutine_handle>(co2::coroutine_data(last));
coro();
}
}
bool await_ready() const
{
return false;
}
void await_suspend(co2::coroutine<>& c)
{
post(c);
}
void await_resume() const {}
};
struct fifo_queue
{
co2::coroutine_handle first = nullptr;
co2::coroutine_handle* p = &first;
void post(co2::coroutine<>& coro)
{
auto h = coro.detach();
auto& next = co2::coroutine_data(h) = nullptr;
*p = h;
p = reinterpret_cast<co2::coroutine_handle*>(&next);
}
void run()
{
while (first)
{
co2::coroutine<> coro{first};
first = static_cast<co2::coroutine_handle>(co2::coroutine_data(first));
if (!first)
p = &first;
coro();
}
}
bool await_ready() const
{
return false;
}
void await_suspend(co2::coroutine<>& c)
{
post(c);
}
void await_resume() const {}
};
template<class Scheduler>
auto skynet(Scheduler& sched, std::int64_t num, std::int64_t size, std::int64_t div)
CO2_BEG(co2::task<std::int64_t>, (sched, num, size, div),
using Sums = std::vector<co2::task<std::int64_t>>;
Sums sums;
std::int64_t sum = 0;
Sums::iterator it, end;
)
{
if (size == 1)
CO2_RETURN(num);
CO2_AWAIT(sched);
size /= div;
sums.reserve(div);
for (boost::int64_t i = 0; i != div; ++i)
{
boost::int64_t sub_num = num + i * size;
sums.push_back(skynet(sched, sub_num, size, div));
}
it = sums.begin();
end = sums.end();
for (; it != end; ++it)
CO2_AWAIT_APPLY(sum +=, *it);
CO2_RETURN(sum);
} CO2_END
template<class T>
class TaskGroup
{
T _tasks;
public:
void post(co2::coroutine<>& c)
{
_tasks.run([h = c.detach()]{co2::coroutine<>{h}();});
}
~TaskGroup()
{
_tasks.wait();
}
bool await_ready() const
{
return false;
}
void await_suspend(co2::coroutine<>& c)
{
post(c);
}
void await_resume() const {}
};
struct asio_task_group
{
boost::asio::io_service io;
std::vector<std::thread> threads;
std::unique_ptr<boost::asio::io_service::work> work;
asio_task_group()
: threads(std::thread::hardware_concurrency())
, work(new boost::asio::io_service::work(io))
{
for (auto& t : threads)
t = std::thread([this] { io.run(); });
}
~asio_task_group()
{
work.reset();
for (auto& t : threads)
t.join();
}
void post(co2::coroutine<>& c)
{
io.post([h = c.detach()]{co2::coroutine<>{h}();});
}
bool await_ready() const
{
return false;
}
void await_suspend(co2::coroutine<>& c)
{
post(c);
}
void await_resume() const {}
};
struct non_sched
{
bool await_ready() const
{
return true;
}
void await_suspend(co2::coroutine<>&) {}
void await_resume() const {}
};
int main()
{
std::cout << "\n[Non]----------------\n";
{
non_sched sched;
auto start = std::chrono::high_resolution_clock::now();
auto val = skynet(sched, 0, 1000000, 10);
co2::wait(val);
auto stop = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
std::cout << "ans: " << co2::get(val) << ", us: " << elapsed.count() << "\n";
}
std::cout << "\n[LIFO]----------------\n";
{
lifo_queue sched;
auto start = std::chrono::high_resolution_clock::now();
auto val = skynet(sched, 0, 1000000, 10);
sched.run();
co2::wait(val);
auto stop = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
std::cout << "ans: " << co2::get(val) << ", us: " << elapsed.count() << "\n";
}
std::cout << "\n[FIFO]----------------\n";
{
fifo_queue sched;
auto start = std::chrono::high_resolution_clock::now();
auto val = skynet(sched, 0, 1000000, 10);
sched.run();
co2::wait(val);
auto stop = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
std::cout << "ans: " << co2::get(val) << ", us: " << elapsed.count() << "\n";
}
std::cout << "\n[PPL]----------------\n";
{
TaskGroup<concurrency::task_group> sched;
auto start = std::chrono::high_resolution_clock::now();
auto val = skynet(sched, 0, 1000000, 10);
co2::wait(val);
auto stop = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
std::cout << "ans: " << co2::get(val) << ", us: " << elapsed.count() << "\n";
}
std::cout << "\n[TBB]----------------\n";
{
TaskGroup<tbb::task_group> sched;
auto start = std::chrono::high_resolution_clock::now();
auto val = skynet(sched, 0, 1000000, 10);
co2::wait(val);
auto stop = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
std::cout << "ans: " << co2::get(val) << ", us: " << elapsed.count() << "\n";
}
std::cout << "\n[ASIO]----------------\n";
{
asio_task_group sched;
auto start = std::chrono::high_resolution_clock::now();
auto val = skynet(sched, 0, 1000000, 10);
co2::wait(val);
auto stop = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
std::cout << "ans: " << co2::get(val) << ", us: " << elapsed.count() << "\n";
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment