Created
April 27, 2013 05:48
-
-
Save yayj/5472033 to your computer and use it in GitHub Desktop.
ASIO ping-pong test modification for io_service per thread.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// client.cpp | |
// ~~~~~~~~~~ | |
// | |
// Copyright (c) 2003-2012 Christopher M. Kohlhoff (chris at kohlhoff dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
#include "asio.hpp" | |
#include <algorithm> | |
#include <boost/bind.hpp> | |
#include <boost/mem_fn.hpp> | |
#include <iostream> | |
#include <list> | |
#include <string> | |
#include "handler_allocator.hpp" | |
class stats | |
{ | |
public: | |
stats() | |
: mutex_(), | |
total_bytes_written_(0), | |
total_bytes_read_(0) | |
{ | |
} | |
void add(size_t bytes_written, size_t bytes_read) | |
{ | |
asio::detail::mutex::scoped_lock lock(mutex_); | |
total_bytes_written_ += bytes_written; | |
total_bytes_read_ += bytes_read; | |
} | |
void print() | |
{ | |
asio::detail::mutex::scoped_lock lock(mutex_); | |
std::cout << total_bytes_written_ << " total bytes written\n"; | |
std::cout << total_bytes_read_ << " total bytes read\n"; | |
} | |
private: | |
asio::detail::mutex mutex_; | |
size_t total_bytes_written_; | |
size_t total_bytes_read_; | |
}; | |
class session | |
{ | |
public: | |
session(asio::io_service& ios, size_t block_size, stats& s) | |
: strand_(ios), | |
socket_(ios), | |
block_size_(block_size), | |
read_data_(new char[block_size]), | |
read_data_length_(0), | |
write_data_(new char[block_size]), | |
unwritten_count_(0), | |
bytes_written_(0), | |
bytes_read_(0), | |
stats_(s) | |
{ | |
for (size_t i = 0; i < block_size_; ++i) | |
write_data_[i] = static_cast<char>(i % 128); | |
} | |
~session() | |
{ | |
stats_.add(bytes_written_, bytes_read_); | |
delete[] read_data_; | |
delete[] write_data_; | |
} | |
void start(asio::ip::tcp::resolver::iterator endpoint_iterator) | |
{ | |
asio::async_connect(socket_, endpoint_iterator, | |
strand_.wrap(boost::bind(&session::handle_connect, this, | |
asio::placeholders::error))); | |
} | |
void stop() | |
{ | |
strand_.post(boost::bind(&session::close_socket, this)); | |
} | |
private: | |
void handle_connect(const asio::error_code& err) | |
{ | |
if (!err) | |
{ | |
asio::error_code set_option_err; | |
asio::ip::tcp::no_delay no_delay(true); | |
socket_.set_option(no_delay, set_option_err); | |
if (!set_option_err) | |
{ | |
++unwritten_count_; | |
async_write(socket_, asio::buffer(write_data_, block_size_), | |
strand_.wrap( | |
make_custom_alloc_handler(write_allocator_, | |
boost::bind(&session::handle_write, this, | |
asio::placeholders::error, | |
asio::placeholders::bytes_transferred)))); | |
socket_.async_read_some(asio::buffer(read_data_, block_size_), | |
strand_.wrap( | |
make_custom_alloc_handler(read_allocator_, | |
boost::bind(&session::handle_read, this, | |
asio::placeholders::error, | |
asio::placeholders::bytes_transferred)))); | |
} | |
} | |
} | |
void handle_read(const asio::error_code& err, size_t length) | |
{ | |
if (!err) | |
{ | |
bytes_read_ += length; | |
read_data_length_ = length; | |
++unwritten_count_; | |
if (unwritten_count_ == 1) | |
{ | |
std::swap(read_data_, write_data_); | |
async_write(socket_, asio::buffer(write_data_, read_data_length_), | |
strand_.wrap( | |
make_custom_alloc_handler(write_allocator_, | |
boost::bind(&session::handle_write, this, | |
asio::placeholders::error, | |
asio::placeholders::bytes_transferred)))); | |
socket_.async_read_some(asio::buffer(read_data_, block_size_), | |
strand_.wrap( | |
make_custom_alloc_handler(read_allocator_, | |
boost::bind(&session::handle_read, this, | |
asio::placeholders::error, | |
asio::placeholders::bytes_transferred)))); | |
} | |
} | |
} | |
void handle_write(const asio::error_code& err, size_t length) | |
{ | |
if (!err && length > 0) | |
{ | |
bytes_written_ += length; | |
--unwritten_count_; | |
if (unwritten_count_ == 1) | |
{ | |
std::swap(read_data_, write_data_); | |
async_write(socket_, asio::buffer(write_data_, read_data_length_), | |
strand_.wrap( | |
make_custom_alloc_handler(write_allocator_, | |
boost::bind(&session::handle_write, this, | |
asio::placeholders::error, | |
asio::placeholders::bytes_transferred)))); | |
socket_.async_read_some(asio::buffer(read_data_, block_size_), | |
strand_.wrap( | |
make_custom_alloc_handler(read_allocator_, | |
boost::bind(&session::handle_read, this, | |
asio::placeholders::error, | |
asio::placeholders::bytes_transferred)))); | |
} | |
} | |
} | |
void close_socket() | |
{ | |
socket_.close(); | |
} | |
private: | |
asio::io_service::strand strand_; | |
asio::ip::tcp::socket socket_; | |
size_t block_size_; | |
char* read_data_; | |
size_t read_data_length_; | |
char* write_data_; | |
int unwritten_count_; | |
size_t bytes_written_; | |
size_t bytes_read_; | |
stats& stats_; | |
handler_allocator read_allocator_; | |
handler_allocator write_allocator_; | |
}; | |
class client | |
{ | |
public: | |
client(asio::io_service& ios, | |
const asio::ip::tcp::resolver::iterator endpoint_iterator, | |
size_t block_size, size_t session_count, int timeout) | |
: io_service_(ios), | |
stop_timer_(ios), | |
sessions_(), | |
stats_() | |
{ | |
stop_timer_.expires_from_now(boost::posix_time::seconds(timeout)); | |
stop_timer_.async_wait(boost::bind(&client::handle_timeout, this)); | |
for (size_t i = 0; i < session_count; ++i) | |
{ | |
session* new_session = new session(io_service_, block_size, stats_); | |
new_session->start(endpoint_iterator); | |
sessions_.push_back(new_session); | |
} | |
} | |
~client() | |
{ | |
while (!sessions_.empty()) | |
{ | |
delete sessions_.front(); | |
sessions_.pop_front(); | |
} | |
stats_.print(); | |
} | |
void handle_timeout() | |
{ | |
std::for_each(sessions_.begin(), sessions_.end(), | |
boost::mem_fn(&session::stop)); | |
} | |
private: | |
asio::io_service& io_service_; | |
asio::deadline_timer stop_timer_; | |
std::list<session*> sessions_; | |
stats stats_; | |
}; | |
void start_client(size_t block_size, size_t session_count, int timeout, char const* host, char const* port) | |
{ | |
asio::io_service ios; | |
asio::ip::tcp::resolver r(ios); | |
asio::ip::tcp::resolver::iterator iter = | |
r.resolve(asio::ip::tcp::resolver::query(host, port)); | |
client c(ios, iter, block_size, session_count, timeout); | |
ios.run(); | |
} | |
int main(int argc, char* argv[]) | |
{ | |
try | |
{ | |
if (argc != 7) | |
{ | |
std::cerr << "Usage: client <host> <port> <threads> <blocksize> "; | |
std::cerr << "<sessions> <time>\n"; | |
return 1; | |
} | |
using namespace std; // For atoi. | |
const char* host = argv[1]; | |
const char* port = argv[2]; | |
int thread_count = atoi(argv[3]); | |
size_t block_size = atoi(argv[4]); | |
size_t session_count = atoi(argv[5]); | |
int timeout = atoi(argv[6]); | |
std::list<asio::thread*> threads; | |
while (--thread_count > 0) | |
{ | |
asio::thread* new_thread = new asio::thread( | |
boost::bind(start_client, block_size, session_count, timeout, host, port)); | |
threads.push_back(new_thread); | |
} | |
start_client(block_size, session_count, timeout, host, port); | |
while (!threads.empty()) | |
{ | |
threads.front()->join(); | |
delete threads.front(); | |
threads.pop_front(); | |
} | |
} | |
catch (std::exception& e) | |
{ | |
std::cerr << "Exception: " << e.what() << "\n"; | |
} | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// server.cpp | |
// ~~~~~~~~~~ | |
// | |
// Copyright (c) 2003-2012 Christopher M. Kohlhoff (chris at kohlhoff dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
#include "asio.hpp" | |
#include <algorithm> | |
#include <boost/bind.hpp> | |
#include <iostream> | |
#include <list> | |
#include <memory> | |
#include <vector> | |
#include "handler_allocator.hpp" | |
class session | |
{ | |
public: | |
session(asio::io_service& ios, size_t block_size) | |
: io_service_(ios), | |
strand_(ios), | |
socket_(ios), | |
block_size_(block_size), | |
read_data_(new char[block_size]), | |
read_data_length_(0), | |
write_data_(new char[block_size]), | |
unsent_count_(0), | |
op_count_(0) | |
{ | |
} | |
~session() | |
{ | |
delete[] read_data_; | |
delete[] write_data_; | |
} | |
asio::ip::tcp::socket& socket() | |
{ | |
return socket_; | |
} | |
void start() | |
{ | |
asio::error_code set_option_err; | |
asio::ip::tcp::no_delay no_delay(true); | |
socket_.set_option(no_delay, set_option_err); | |
if (!set_option_err) | |
{ | |
++op_count_; | |
socket_.async_read_some(asio::buffer(read_data_, block_size_), | |
strand_.wrap( | |
make_custom_alloc_handler(read_allocator_, | |
boost::bind(&session::handle_read, this, | |
asio::placeholders::error, | |
asio::placeholders::bytes_transferred)))); | |
} | |
else | |
{ | |
io_service_.post(boost::bind(&session::destroy, this)); | |
} | |
} | |
void handle_read(const asio::error_code& err, size_t length) | |
{ | |
--op_count_; | |
if (!err) | |
{ | |
read_data_length_ = length; | |
++unsent_count_; | |
if (unsent_count_ == 1) | |
{ | |
op_count_ += 2; | |
std::swap(read_data_, write_data_); | |
async_write(socket_, asio::buffer(write_data_, read_data_length_), | |
strand_.wrap( | |
make_custom_alloc_handler(write_allocator_, | |
boost::bind(&session::handle_write, this, | |
asio::placeholders::error)))); | |
socket_.async_read_some(asio::buffer(read_data_, block_size_), | |
strand_.wrap( | |
make_custom_alloc_handler(read_allocator_, | |
boost::bind(&session::handle_read, this, | |
asio::placeholders::error, | |
asio::placeholders::bytes_transferred)))); | |
} | |
} | |
if (op_count_ == 0) | |
io_service_.post(boost::bind(&session::destroy, this)); | |
} | |
void handle_write(const asio::error_code& err) | |
{ | |
--op_count_; | |
if (!err) | |
{ | |
--unsent_count_; | |
if (unsent_count_ == 1) | |
{ | |
op_count_ += 2; | |
std::swap(read_data_, write_data_); | |
async_write(socket_, asio::buffer(write_data_, read_data_length_), | |
strand_.wrap( | |
make_custom_alloc_handler(write_allocator_, | |
boost::bind(&session::handle_write, this, | |
asio::placeholders::error)))); | |
socket_.async_read_some(asio::buffer(read_data_, block_size_), | |
strand_.wrap( | |
make_custom_alloc_handler(read_allocator_, | |
boost::bind(&session::handle_read, this, | |
asio::placeholders::error, | |
asio::placeholders::bytes_transferred)))); | |
} | |
} | |
if (op_count_ == 0) | |
io_service_.post(boost::bind(&session::destroy, this)); | |
} | |
static void destroy(session* s) | |
{ | |
delete s; | |
} | |
private: | |
asio::io_service& io_service_; | |
asio::io_service::strand strand_; | |
asio::ip::tcp::socket socket_; | |
size_t block_size_; | |
char* read_data_; | |
size_t read_data_length_; | |
char* write_data_; | |
int unsent_count_; | |
int op_count_; | |
handler_allocator read_allocator_; | |
handler_allocator write_allocator_; | |
}; | |
void session_runner(session* new_session, const asio::error_code& err) | |
{ | |
if (!err) | |
{ | |
new_session->start(); | |
} | |
else | |
{ | |
delete new_session; | |
} | |
} | |
struct work_thread | |
{ | |
std::unique_ptr<asio::thread> worker_; | |
std::unique_ptr<asio::io_service::work> flag_; | |
asio::io_service ios_; | |
work_thread() : ios_(), worker_(), flag_() | |
{ | |
flag_.reset(new asio::io_service::work(ios_)); | |
worker_.reset(new asio::thread(boost::bind(&asio::io_service::run, &ios_))); | |
} | |
~work_thread() | |
{ | |
flag_ = nullptr; | |
worker_->join(); | |
} | |
}; | |
class server | |
{ | |
public: | |
server(asio::io_service& ios, const asio::ip::tcp::endpoint& endpoint, | |
size_t block_size, int thread_count) | |
: io_service_(ios), | |
acceptor_(ios), | |
block_size_(block_size), | |
pos_(0), thread_count_(thread_count), | |
workers_(thread_count - 1) | |
{ | |
acceptor_.open(endpoint.protocol()); | |
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(1)); | |
acceptor_.bind(endpoint); | |
acceptor_.listen(); | |
start_accept(); | |
} | |
void start_accept() | |
{ | |
session* new_session = new session(io_service_, block_size_); | |
acceptor_.async_accept(new_session->socket(), | |
boost::bind(&server::handle_accept, this, new_session, | |
asio::placeholders::error)); | |
} | |
void handle_accept(session* new_session, const asio::error_code& err) | |
{ | |
pos_ = (pos_ + 1) % thread_count_; | |
if (pos_ == thread_count_ - 1) | |
session_runner(new_session, err); | |
else | |
workers_[pos_].ios_.post( | |
boost::bind(session_runner, new_session, err)); | |
start_accept(); | |
} | |
private: | |
asio::io_service& io_service_; | |
asio::ip::tcp::acceptor acceptor_; | |
size_t block_size_; | |
std::vector<work_thread> workers_; | |
int pos_; | |
int thread_count_; | |
}; | |
int main(int argc, char* argv[]) | |
{ | |
try | |
{ | |
if (argc != 5) | |
{ | |
std::cerr << "Usage: server <address> <port> <threads> <blocksize>\n"; | |
return 1; | |
} | |
using namespace std; // For atoi. | |
asio::ip::address address = asio::ip::address::from_string(argv[1]); | |
short port = atoi(argv[2]); | |
int thread_count = atoi(argv[3]); | |
size_t block_size = atoi(argv[4]); | |
asio::io_service ios(thread_count); | |
server s(ios, asio::ip::tcp::endpoint(address, port), block_size, thread_count); | |
// Threads not currently supported in this test. | |
// std::list<asio::thread*> threads; | |
// while (--thread_count > 0) | |
// { | |
// asio::thread* new_thread = new asio::thread( | |
// boost::bind(&asio::io_service::run, &ios)); | |
// threads.push_back(new_thread); | |
// } | |
ios.run(); | |
// while (!threads.empty()) | |
// { | |
// threads.front()->join(); | |
// delete threads.front(); | |
// threads.pop_front(); | |
// } | |
} | |
catch (std::exception& e) | |
{ | |
std::cerr << "Exception: " << e.what() << "\n"; | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment