Skip to content

Instantly share code, notes, and snippets.

@yayj
Created April 27, 2013 05:48
Show Gist options
  • Save yayj/5472033 to your computer and use it in GitHub Desktop.
Save yayj/5472033 to your computer and use it in GitHub Desktop.
ASIO ping-pong test modification for io_service per thread.
//
// client.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2012 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "asio.hpp"
#include <algorithm>
#include <boost/bind.hpp>
#include <boost/mem_fn.hpp>
#include <iostream>
#include <list>
#include <string>
#include "handler_allocator.hpp"
class stats
{
public:
stats()
: mutex_(),
total_bytes_written_(0),
total_bytes_read_(0)
{
}
void add(size_t bytes_written, size_t bytes_read)
{
asio::detail::mutex::scoped_lock lock(mutex_);
total_bytes_written_ += bytes_written;
total_bytes_read_ += bytes_read;
}
void print()
{
asio::detail::mutex::scoped_lock lock(mutex_);
std::cout << total_bytes_written_ << " total bytes written\n";
std::cout << total_bytes_read_ << " total bytes read\n";
}
private:
asio::detail::mutex mutex_;
size_t total_bytes_written_;
size_t total_bytes_read_;
};
class session
{
public:
session(asio::io_service& ios, size_t block_size, stats& s)
: strand_(ios),
socket_(ios),
block_size_(block_size),
read_data_(new char[block_size]),
read_data_length_(0),
write_data_(new char[block_size]),
unwritten_count_(0),
bytes_written_(0),
bytes_read_(0),
stats_(s)
{
for (size_t i = 0; i < block_size_; ++i)
write_data_[i] = static_cast<char>(i % 128);
}
~session()
{
stats_.add(bytes_written_, bytes_read_);
delete[] read_data_;
delete[] write_data_;
}
void start(asio::ip::tcp::resolver::iterator endpoint_iterator)
{
asio::async_connect(socket_, endpoint_iterator,
strand_.wrap(boost::bind(&session::handle_connect, this,
asio::placeholders::error)));
}
void stop()
{
strand_.post(boost::bind(&session::close_socket, this));
}
private:
void handle_connect(const asio::error_code& err)
{
if (!err)
{
asio::error_code set_option_err;
asio::ip::tcp::no_delay no_delay(true);
socket_.set_option(no_delay, set_option_err);
if (!set_option_err)
{
++unwritten_count_;
async_write(socket_, asio::buffer(write_data_, block_size_),
strand_.wrap(
make_custom_alloc_handler(write_allocator_,
boost::bind(&session::handle_write, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred))));
socket_.async_read_some(asio::buffer(read_data_, block_size_),
strand_.wrap(
make_custom_alloc_handler(read_allocator_,
boost::bind(&session::handle_read, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred))));
}
}
}
void handle_read(const asio::error_code& err, size_t length)
{
if (!err)
{
bytes_read_ += length;
read_data_length_ = length;
++unwritten_count_;
if (unwritten_count_ == 1)
{
std::swap(read_data_, write_data_);
async_write(socket_, asio::buffer(write_data_, read_data_length_),
strand_.wrap(
make_custom_alloc_handler(write_allocator_,
boost::bind(&session::handle_write, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred))));
socket_.async_read_some(asio::buffer(read_data_, block_size_),
strand_.wrap(
make_custom_alloc_handler(read_allocator_,
boost::bind(&session::handle_read, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred))));
}
}
}
void handle_write(const asio::error_code& err, size_t length)
{
if (!err && length > 0)
{
bytes_written_ += length;
--unwritten_count_;
if (unwritten_count_ == 1)
{
std::swap(read_data_, write_data_);
async_write(socket_, asio::buffer(write_data_, read_data_length_),
strand_.wrap(
make_custom_alloc_handler(write_allocator_,
boost::bind(&session::handle_write, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred))));
socket_.async_read_some(asio::buffer(read_data_, block_size_),
strand_.wrap(
make_custom_alloc_handler(read_allocator_,
boost::bind(&session::handle_read, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred))));
}
}
}
void close_socket()
{
socket_.close();
}
private:
asio::io_service::strand strand_;
asio::ip::tcp::socket socket_;
size_t block_size_;
char* read_data_;
size_t read_data_length_;
char* write_data_;
int unwritten_count_;
size_t bytes_written_;
size_t bytes_read_;
stats& stats_;
handler_allocator read_allocator_;
handler_allocator write_allocator_;
};
class client
{
public:
client(asio::io_service& ios,
const asio::ip::tcp::resolver::iterator endpoint_iterator,
size_t block_size, size_t session_count, int timeout)
: io_service_(ios),
stop_timer_(ios),
sessions_(),
stats_()
{
stop_timer_.expires_from_now(boost::posix_time::seconds(timeout));
stop_timer_.async_wait(boost::bind(&client::handle_timeout, this));
for (size_t i = 0; i < session_count; ++i)
{
session* new_session = new session(io_service_, block_size, stats_);
new_session->start(endpoint_iterator);
sessions_.push_back(new_session);
}
}
~client()
{
while (!sessions_.empty())
{
delete sessions_.front();
sessions_.pop_front();
}
stats_.print();
}
void handle_timeout()
{
std::for_each(sessions_.begin(), sessions_.end(),
boost::mem_fn(&session::stop));
}
private:
asio::io_service& io_service_;
asio::deadline_timer stop_timer_;
std::list<session*> sessions_;
stats stats_;
};
void start_client(size_t block_size, size_t session_count, int timeout, char const* host, char const* port)
{
asio::io_service ios;
asio::ip::tcp::resolver r(ios);
asio::ip::tcp::resolver::iterator iter =
r.resolve(asio::ip::tcp::resolver::query(host, port));
client c(ios, iter, block_size, session_count, timeout);
ios.run();
}
int main(int argc, char* argv[])
{
try
{
if (argc != 7)
{
std::cerr << "Usage: client <host> <port> <threads> <blocksize> ";
std::cerr << "<sessions> <time>\n";
return 1;
}
using namespace std; // For atoi.
const char* host = argv[1];
const char* port = argv[2];
int thread_count = atoi(argv[3]);
size_t block_size = atoi(argv[4]);
size_t session_count = atoi(argv[5]);
int timeout = atoi(argv[6]);
std::list<asio::thread*> threads;
while (--thread_count > 0)
{
asio::thread* new_thread = new asio::thread(
boost::bind(start_client, block_size, session_count, timeout, host, port));
threads.push_back(new_thread);
}
start_client(block_size, session_count, timeout, host, port);
while (!threads.empty())
{
threads.front()->join();
delete threads.front();
threads.pop_front();
}
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
//
// server.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2012 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "asio.hpp"
#include <algorithm>
#include <boost/bind.hpp>
#include <iostream>
#include <list>
#include <memory>
#include <vector>
#include "handler_allocator.hpp"
class session
{
public:
session(asio::io_service& ios, size_t block_size)
: io_service_(ios),
strand_(ios),
socket_(ios),
block_size_(block_size),
read_data_(new char[block_size]),
read_data_length_(0),
write_data_(new char[block_size]),
unsent_count_(0),
op_count_(0)
{
}
~session()
{
delete[] read_data_;
delete[] write_data_;
}
asio::ip::tcp::socket& socket()
{
return socket_;
}
void start()
{
asio::error_code set_option_err;
asio::ip::tcp::no_delay no_delay(true);
socket_.set_option(no_delay, set_option_err);
if (!set_option_err)
{
++op_count_;
socket_.async_read_some(asio::buffer(read_data_, block_size_),
strand_.wrap(
make_custom_alloc_handler(read_allocator_,
boost::bind(&session::handle_read, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred))));
}
else
{
io_service_.post(boost::bind(&session::destroy, this));
}
}
void handle_read(const asio::error_code& err, size_t length)
{
--op_count_;
if (!err)
{
read_data_length_ = length;
++unsent_count_;
if (unsent_count_ == 1)
{
op_count_ += 2;
std::swap(read_data_, write_data_);
async_write(socket_, asio::buffer(write_data_, read_data_length_),
strand_.wrap(
make_custom_alloc_handler(write_allocator_,
boost::bind(&session::handle_write, this,
asio::placeholders::error))));
socket_.async_read_some(asio::buffer(read_data_, block_size_),
strand_.wrap(
make_custom_alloc_handler(read_allocator_,
boost::bind(&session::handle_read, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred))));
}
}
if (op_count_ == 0)
io_service_.post(boost::bind(&session::destroy, this));
}
void handle_write(const asio::error_code& err)
{
--op_count_;
if (!err)
{
--unsent_count_;
if (unsent_count_ == 1)
{
op_count_ += 2;
std::swap(read_data_, write_data_);
async_write(socket_, asio::buffer(write_data_, read_data_length_),
strand_.wrap(
make_custom_alloc_handler(write_allocator_,
boost::bind(&session::handle_write, this,
asio::placeholders::error))));
socket_.async_read_some(asio::buffer(read_data_, block_size_),
strand_.wrap(
make_custom_alloc_handler(read_allocator_,
boost::bind(&session::handle_read, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred))));
}
}
if (op_count_ == 0)
io_service_.post(boost::bind(&session::destroy, this));
}
static void destroy(session* s)
{
delete s;
}
private:
asio::io_service& io_service_;
asio::io_service::strand strand_;
asio::ip::tcp::socket socket_;
size_t block_size_;
char* read_data_;
size_t read_data_length_;
char* write_data_;
int unsent_count_;
int op_count_;
handler_allocator read_allocator_;
handler_allocator write_allocator_;
};
void session_runner(session* new_session, const asio::error_code& err)
{
if (!err)
{
new_session->start();
}
else
{
delete new_session;
}
}
struct work_thread
{
std::unique_ptr<asio::thread> worker_;
std::unique_ptr<asio::io_service::work> flag_;
asio::io_service ios_;
work_thread() : ios_(), worker_(), flag_()
{
flag_.reset(new asio::io_service::work(ios_));
worker_.reset(new asio::thread(boost::bind(&asio::io_service::run, &ios_)));
}
~work_thread()
{
flag_ = nullptr;
worker_->join();
}
};
class server
{
public:
server(asio::io_service& ios, const asio::ip::tcp::endpoint& endpoint,
size_t block_size, int thread_count)
: io_service_(ios),
acceptor_(ios),
block_size_(block_size),
pos_(0), thread_count_(thread_count),
workers_(thread_count - 1)
{
acceptor_.open(endpoint.protocol());
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(1));
acceptor_.bind(endpoint);
acceptor_.listen();
start_accept();
}
void start_accept()
{
session* new_session = new session(io_service_, block_size_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
asio::placeholders::error));
}
void handle_accept(session* new_session, const asio::error_code& err)
{
pos_ = (pos_ + 1) % thread_count_;
if (pos_ == thread_count_ - 1)
session_runner(new_session, err);
else
workers_[pos_].ios_.post(
boost::bind(session_runner, new_session, err));
start_accept();
}
private:
asio::io_service& io_service_;
asio::ip::tcp::acceptor acceptor_;
size_t block_size_;
std::vector<work_thread> workers_;
int pos_;
int thread_count_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 5)
{
std::cerr << "Usage: server <address> <port> <threads> <blocksize>\n";
return 1;
}
using namespace std; // For atoi.
asio::ip::address address = asio::ip::address::from_string(argv[1]);
short port = atoi(argv[2]);
int thread_count = atoi(argv[3]);
size_t block_size = atoi(argv[4]);
asio::io_service ios(thread_count);
server s(ios, asio::ip::tcp::endpoint(address, port), block_size, thread_count);
// Threads not currently supported in this test.
// std::list<asio::thread*> threads;
// while (--thread_count > 0)
// {
// asio::thread* new_thread = new asio::thread(
// boost::bind(&asio::io_service::run, &ios));
// threads.push_back(new_thread);
// }
ios.run();
// while (!threads.empty())
// {
// threads.front()->join();
// delete threads.front();
// threads.pop_front();
// }
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment