Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
#include <cstddef>
#include <algorithm>
#include <atomic>
#include <exception>
#include <iostream>
#include <random>
#include <thread>
#include <vector>
#include <boost/asio/io_service.hpp>
#include <boost/asio/strand.hpp>
#include <boost/program_options.hpp>
#include <boost/thread/barrier.hpp>
#include <boost/timer/timer.hpp>
namespace asio = boost::asio;
struct work
{
static std::size_t nloop_max;
static thread_local std::mt19937 rnd;
void operator()()
{
auto const volatile count
= std::uniform_int_distribution<std::size_t>(nloop_max * 0.9, nloop_max)(rnd);
for (auto i = std::size_t{}; i < count; ++i) {}
}
};
thread_local std::mt19937 work::rnd{};
std::size_t work::nloop_max{};
auto create_asio_work(std::vector<asio::io_service>& io_service)
-> std::vector<asio::io_service::work>
{
auto works = std::vector<asio::io_service::work>{};
works.reserve(io_service.size());
for (auto&& ios : io_service) {
works.emplace_back(ios);
}
return works;
}
auto create_consumer_threads(
std::size_t const nconsumer
, std::vector<asio::io_service>& io_service
, boost::barrier& start_barrier, boost::barrier& consumer_stop_barrier)
-> std::vector<std::thread>
{
std::cout << "setup " << nconsumer << " consumer thread(s)..." << std::endl;
auto consumers = std::vector<std::thread>{};
consumers.reserve(nconsumer);
for (auto i = std::size_t{}; i < nconsumer; ++i) {
consumers.emplace_back([&, i]{
start_barrier.wait();
try {
io_service[i % io_service.size()].run();
}
catch (std::exception const& e) {
std::cerr << "consumer " << i << ':' << e.what() << std::endl;
}
consumer_stop_barrier.wait();
});
}
return consumers;
}
template <class Executor>
auto create_producer_threads(
std::size_t const nproducer, std::size_t const nwork
, std::vector<Executor>& executor
, boost::barrier& start_barrier, boost::barrier& producer_stop_barrier)
-> std::vector<std::thread>
{
auto const nwork_per_thread = (nproducer > 0) ? nwork / nproducer : 0;
std::cout
<< "setup " << nproducer << " producer thread(s)"
<< " with " << nwork_per_thread << " work(s) per thread..."
<< std::endl;
auto producers = std::vector<std::thread>{};
producers.reserve(nproducer);
for (auto i = std::size_t{}; i < nproducer; ++i) {
producers.emplace_back([&, i, nwork_per_thread]{
start_barrier.wait();
try {
for (auto j = std::size_t{}; j < nwork_per_thread; ++j) {
executor[i % executor.size()].post(work{});
}
}
catch (std::exception const& e) {
std::cerr << "producer " << i << ':' << e.what() << std::endl;
}
producer_stop_barrier.wait();
});
}
return producers;
}
template <class Executor>
void run(std::vector<asio::io_service>& io_service
, std::size_t const nconsumer
, std::vector<Executor>& executor
, std::size_t const nproducer, std::size_t const nwork)
{
auto asio_works = create_asio_work(io_service);
boost::barrier start_barrier(nconsumer + nproducer + 1);
boost::barrier consumer_stop_barrier(nconsumer + 1);
boost::barrier producer_stop_barrier(nproducer + 1);
auto consumers = create_consumer_threads(
nconsumer, io_service, start_barrier, consumer_stop_barrier);
auto producers = create_producer_threads(
nproducer, nwork, executor, start_barrier, producer_stop_barrier);
{
std::cout << "running..." << std::endl;
start_barrier.wait();
boost::timer::auto_cpu_timer timer{};
producer_stop_barrier.wait();
asio_works.clear();
consumer_stop_barrier.wait();
}
for (auto&& t : producers) {
t.join();
}
for (auto&& t : consumers) {
t.join();
}
}
template <class Executor>
void set_prework(std::vector<Executor>& executor, std::size_t const nwork)
{
std::cout << "setup " << nwork << " prework(s)..." << std::endl;
for (auto i = std::size_t{}; i < nwork; ++i) {
executor[i % executor.size()].post(work{});
}
}
int main(int argc, char const* argv[])
{
try {
namespace popts = boost::program_options;
popts::options_description desc{"perform io_service on consumer/producer model"};
desc.add_options()
("help,h", "display help message")
("nio_service,i", popts::value<std::size_t>()->default_value(1), "the number of io_services")
("nconsumer,c", popts::value<std::size_t>()->default_value(1), "the number of consumers")
("nproducer,p", popts::value<std::size_t>()->default_value(0), "the number of producers")
("nwork,w", popts::value<std::size_t>()->default_value(0), "the number of works that producers post to io_services")
("nprework,r", popts::value<std::size_t>()->default_value(0), "the number of works which are posted to io_services in advance")
("nloop,l", popts::value<std::size_t>(&work::nloop_max)->default_value(1000), "max loop count per work")
("strand,s", "use strand as executor")
;
auto vm = popts::variables_map{};
popts::store(popts::parse_command_line(argc, argv, desc), vm);
popts::notify(vm);
if (vm.count("help")) {
std::cout << desc << std::endl;
return 0;
}
auto const nio_service = vm["nio_service"].as<std::size_t>();
auto const nconsumer
= std::max(vm["nconsumer"].as<std::size_t>(), nio_service);
auto const nproducer = vm["nproducer"].as<std::size_t>();
auto const nwork = vm["nwork"].as<std::size_t>();
auto const nprework = vm["nprework"].as<std::size_t>();
if (!vm.count("strand")) {
std::cout << "use " << nio_service << " io_service(s)" << std::endl;
auto io_service = std::vector<asio::io_service>(nio_service);
set_prework(io_service, nprework);
run(io_service, nconsumer, io_service, nproducer, nwork);
}
else {
std::cout << "use " << nio_service << " strand(s)" << std::endl;
auto io_service = std::vector<asio::io_service>(1);
auto strand = std::vector<asio::io_service::strand>{};
strand.reserve(nio_service);
for (auto i = std::size_t{}; i < nio_service; ++i) {
strand.emplace_back(io_service[0]);
}
set_prework(strand, nprework);
run(io_service, nconsumer, strand, nproducer, nwork);
}
}
catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment