Skip to content

Instantly share code, notes, and snippets.

@owent
Last active December 17, 2015 19:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save owent/5660983 to your computer and use it in GitHub Desktop.
Save owent/5660983 to your computer and use it in GitHub Desktop.
Boost.Asio 性能测试代码 GCC版本: 4.8.0 编译命令: gcc *.cpp -o test.exe -I/home/owent/Libs/boost/x86_all/include -L/home/owent/Libs/boost/x86_all/lib -lboost_random -lboost_thread -lboost_system -lstdc++ -lrt -pthread -D_POSIX_MT_ -std=c++11 -O2 -g -ggdb
#include <cstdio>
#include <iostream>
#include <cstdlib>
#include <stdint.h>
#include <cstring>
#include <string>
#include <sstream>
#include <map>
#include <vector>
#include <assert.h>
#include <boost/asio.hpp>
#include <boost/random.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/atomic.hpp>
int iSendCount = 256;
/**
* 客户端网络线程函数
*/
void client_thread()
{
std::cout<< "---- Client Thread "<< boost::this_thread::get_id()<< " Started"<< std::endl;
// Step 1. 创建服务对象
static boost::asio::io_service stMainService;
// Step 2. 创建地址生成器及生成地址
boost::asio::ip::tcp::resolver stResolver(stMainService);
// 其实第二个参数8731也可以写成http、ftp什么的,所以他这里用了字符串
boost::asio::ip::tcp::endpoint stEndpoint = *stResolver.resolve(boost::asio::ip::tcp::resolver::query("10.24.248.86", "8731"));
// Step 3. 创建Socket
boost::shared_ptr<boost::asio::ip::tcp::socket> ptrCurSock = boost::shared_ptr<boost::asio::ip::tcp::socket>(
new boost::asio::ip::tcp::socket(stMainService)
);
// Step 4. 连接服务器
ptrCurSock->connect(stEndpoint);
std::cout<< "---- Client Thread "<< boost::this_thread::get_id()<< " Connect Success => "<< ptrCurSock->local_endpoint().port()<< std::endl;
int iTimes = iSendCount;
while( -- iTimes) { try {
//** 睡眠一段时间再发数据
static boost::random::mt19937 rng;
static boost::random::uniform_int_distribution<> sleep_rdm(0,5);
int iSleepFor = 1 + sleep_rdm(rng);
boost::this_thread::sleep_for(boost::chrono::milliseconds(iSleepFor));
// ** 构造数据
boost::asio::streambuf stSendedBuff;
std::ostream stSendedBuffOS(&stSendedBuff);
stSendedBuffOS<< "---- Client Thread "<< boost::this_thread::get_id()<< " Sock "<< ptrCurSock->local_endpoint().port()<< " Say Hello";
// Step 5 发送数据
size_t ullBufSize = stSendedBuff.size();
boost::asio::write(*ptrCurSock, boost::asio::buffer(&ullBufSize, sizeof(ullBufSize)), boost::asio::transfer_exactly(sizeof(ullBufSize)));
boost::asio::write(*ptrCurSock, stSendedBuff, boost::asio::transfer_at_least(stSendedBuff.size() - sizeof(ullBufSize)));
// sent data is removed from input sequence
stSendedBuff.consume(stSendedBuff.size());
std::cout<< "**** Client Thread "<< boost::this_thread::get_id()<< " Sended"<< std::endl;
// Step 6 接收数据
char strRecvStr[2048];
ptrCurSock->read_some(boost::asio::buffer(strRecvStr, sizeof(size_t)));
ptrCurSock->read_some(boost::asio::buffer(strRecvStr + sizeof(size_t), *((size_t*)(strRecvStr))));
std::cout<< "---- Client Thread "<< boost::this_thread::get_id()<< " Recv Data: "<< strRecvStr + sizeof(size_t)<< std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds(10 - iSleepFor));
} catch(...){} }
// Step 7. 等待服务关闭
stMainService.run();
// Step 8. Socket 关闭退出
ptrCurSock->close();
}
void print_asio_info();
int main(int argc, char* argv[]) {
std::vector<boost::thread*> ctl;
int max_client_num = 128;
if(argc > 1)
{
iSendCount = atoi(argv[1]);
}
if(argc > 2)
{
max_client_num = atoi(argv[2]);
}
for (int i = 0; i < max_client_num; ++ i)
{
// 打印输出错开
boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
ctl.push_back(new boost::thread(client_thread));
}
for (size_t i = 0; i < ctl.size(); ++ i)
{
ctl[i]->join();
delete ctl[i];
}
print_asio_info();
return 0;
}
void print_asio_info()
{
#if defined(BOOST_ASIO_HAS_IOCP)
std::cout<< "Boost.Asio using iocp"<< std::endl;
#if defined(BOOST_ASIO_HAS_SERIAL_PORT)
std::cout<< "\tSerial Port: enabled"<< std::endl;
#else
std::cout<< "\tSerial Port: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_WINDOWS_STREAM_HANDLE)
std::cout<< "\tStream handle: enabled"<< std::endl;
#else
std::cout<< "\tStream handle: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_WINDOWS_RANDOM_ACCESS_HANDLE)
std::cout<< "\tWindows Random Access Handle: enabled"<< std::endl;
#else
std::cout<< "\tWindows Random Access Handle: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_WINDOWS_OBJECT_HANDLE)
std::cout<< "\tObject Handle: enabled"<< std::endl;
#else
std::cout<< "\tObject Handle: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_WINDOWS_OVERLAPPED_PTR)
std::cout<< "\tWindows Overlapped Ptr: enabled"<< std::endl;
#else
std::cout<< "\tWindows Overlapped Ptr: disabled"<< std::endl;
#endif
#elif defined(BOOST_ASIO_HAS_EPOLL)
std::cout<< "Boost.Asio using epoll."<< std::endl;
#if defined(BOOST_ASIO_HAS_EVENTFD)
std::cout<< "\teventfd: enabled"<< std::endl;
#else
std::cout<< "\teventfd: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_TIMERFD)
std::cout<< "\timerfd: enabled"<< std::endl;
#else
std::cout<< "\timerfd: disabled"<< std::endl;
#endif
#elif defined(BOOST_ASIO_HAS_KQUEUE)
std::cout<< "Boost.Asio using kqueue"<< std::endl;
#elif defined(BOOST_ASIO_HAS_DEV_POLL)
std::cout<< "Boost.Asio using solaris: /dev/poll"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
std::cout<< "Posix Stream Descriptor: enabled"<< std::endl;
#else
std::cout<< "Posix Stream Descriptor: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
std::cout<< "Unix domain socket: enabled"<< std::endl;
#else
std::cout<< "Unix domain socket: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_SIGACTION)
std::cout<< "sigaction: enabled"<< std::endl;
#else
std::cout<< "sigaction: disabled"<< std::endl;
#endif
}
#include <cstdio>
#include <iostream>
#include <cstdlib>
#include <stdint.h>
#include <cstring>
#include <string>
#include <sstream>
#include <map>
#include <vector>
#include <assert.h>
#include <boost/asio.hpp>
#include <boost/random.hpp>
#include <boost/bind.hpp>
#include <thread>
#include <chrono>
#include <memory>
#include <boost/atomic.hpp>
/** 服务器 socket连接计数 **/
boost::atomic_int32_t g_iPkgPerSecond, g_iCurClientNumber;
bool bRunning = true;
class MySocket: public boost::asio::ip::tcp::socket
{
public:
MySocket(boost::asio::io_service& stSvr): boost::asio::ip::tcp::socket(stSvr){}
~MySocket(){ std::cout<< "Socket "<< native_handle()<< " released."<< std::endl; }
};
void signal_handler(
boost::asio::io_service& stMainService,
const boost::system::error_code& error,
int signal_number)
{
if (error)
{
std::cout<< error.message()<< std::endl;
}
std::cout<< "Catch a signal: "<< signal_number<< std::endl;
bRunning = false;
stMainService.stop();
}
void deadline_timer_handler(
boost::asio::deadline_timer& stSteadyTimer,
const boost::system::error_code& error // Result of operation.
)
{
if (error == boost::asio::error::operation_aborted)
{
std::cout<< "On Steady Timer Abouted"<< std::endl;
}
else
{
stSteadyTimer.expires_at(stSteadyTimer.expires_at() + boost::posix_time::seconds(1));
stSteadyTimer.async_wait(boost::bind(deadline_timer_handler, boost::ref(stSteadyTimer), _1));
}
std::cout<< "Pkg Per Second: "<< g_iPkgPerSecond.load()<< std::endl;
g_iPkgPerSecond.store(0);
}
/**
* 服务器异步发送数据回调函数
* @param [in] ptrBuffStr 发送的数据buff(传过来仅是为了给智能指针计数+1,防止释放数据的)
* @param [in] error 错误信息
* @param [in] bytes_transferred 发送的数据大小
*/
void server_thread_send_handler(
std::shared_ptr<std::string> ptrBuffStr,
const boost::system::error_code& error, // Result of operation.
std::size_t bytes_transferred
)
{
if (error)
{
std::cout<< "==== Server Thread "<< std::this_thread::get_id()<< " Send Failed: "<< error.message()<< std::endl;
}
}
/**
* 服务器异步接收数据回调函数
* @param [in] ptrCurSock 收取数据的Socket
* @param [in] ptrSockStreamBuff 收取数据的Buff对象
* @param [in] error 错误信息
*/
void server_thread_recv_handler(
std::shared_ptr<MySocket> ptrCurSock,
std::shared_ptr<boost::asio::streambuf> ptrSockStreamBuff,
const boost::system::error_code& error // Result of operation.
)
{
// 5.2.1 先接收buff长度
if (ptrSockStreamBuff->size() == sizeof(size_t))
{
const size_t* pLen = boost::asio::buffer_cast<const size_t*>(ptrSockStreamBuff->data());
boost::asio::async_read(*ptrCurSock, *ptrSockStreamBuff, boost::asio::transfer_exactly(*pLen), boost::bind(
server_thread_recv_handler, ptrCurSock, ptrSockStreamBuff, _1
));
// 移出socket buff
ptrSockStreamBuff->consume(sizeof(size_t));
return;
}
// 5.2.2 再接收buff数据
if (ptrSockStreamBuff->size() > 0)
{
// 5.2.3 处理数据
boost::asio::streambuf::const_buffers_type bufs = ptrSockStreamBuff->data();
std::string strOut(boost::asio::buffers_begin(bufs), boost::asio::buffers_begin(bufs) + ptrSockStreamBuff->size());
strOut += (char)(0);
std::cout<< "==== Server Thread "<< std::this_thread::get_id()<< " Sock "<< ptrCurSock->native_handle()<< " Recv Data: "<< strOut<< std::endl;
// // 5.2.4 移出socket buff
ptrSockStreamBuff->consume(ptrSockStreamBuff->size());
// Sleep 500 毫秒。模拟正忙
// std::this_thread::sleep_for(std::chrono::milliseconds(500));
// 5.2.5 回包
// 组装数据
std::stringstream stServerSendBuf;
stServerSendBuf << "Server Thread "<< std::this_thread::get_id()<< " Say Got{ "<< strOut<< " }";
std::shared_ptr<std::string> ptrBuffStr = std::shared_ptr<std::string>(new std::string(stServerSendBuf.str()));
// 先发Buff长度
size_t uBufLen = ptrBuffStr->size();
boost::asio::async_write(*ptrCurSock, boost::asio::buffer(&uBufLen, sizeof(uBufLen)), boost::asio::transfer_exactly(sizeof(uBufLen)),
boost::bind(server_thread_send_handler, ptrBuffStr, _1, _2)
);
// 再发数据
boost::asio::async_write(*ptrCurSock, boost::asio::buffer(*ptrBuffStr), boost::asio::transfer_exactly(ptrBuffStr->size()),
boost::bind(server_thread_send_handler, ptrBuffStr, _1, _2)
);
// 5.2.6 重新监听
boost::asio::async_read(*ptrCurSock, *ptrSockStreamBuff, boost::asio::transfer_exactly(sizeof(size_t)), boost::bind(
server_thread_recv_handler, ptrCurSock, ptrSockStreamBuff, _1
));
g_iPkgPerSecond.fetch_add(1);
}
// 5.2.7 关闭退出
if (error == boost::asio::error::eof)
{
std::cout<< "xxxx Server Thread "<< std::this_thread::get_id()<< " Sock Release: "<< ptrCurSock->native_handle()<< std::endl;
g_iCurClientNumber.fetch_sub(1);
return;
}
if (error)
{
std::cout<< "==== Server Thread "<< std::this_thread::get_id()<< " Recv Failed: "<< error.message()<< std::endl;
return;
}
}
/**
* 服务器异步接受Socket回调函数
* @param [in] stMainService 服务对象
* @param [in] stAccepter 接受Socket
* @param [in] ptrCurSock 建立连接的Socket
* @param [in] error 错误信息
*/
void server_thread_accept_handler(
boost::asio::io_service& stMainService,
boost::asio::ip::tcp::acceptor& stAccepter,
std::shared_ptr<MySocket>& ptrCurSock,
const boost::system::error_code& error // Result of operation.
)
{
if (error)
{
std::cout<< "++++ Server Accept Failed: "<< error.message()<< std::endl;
return;
}
std::cout<< "++++ Server Thread "<< std::this_thread::get_id()<< " Accept a socket: "<< ptrCurSock->native_handle()<< std::endl;
std::cout<< "\tServer Thread Local End Point:"<< std::endl<<
"\t\tServer Thread Address: "<< ptrCurSock->local_endpoint().address().to_string()<< std::endl<<
"\t\tServer Thread Port: "<< ptrCurSock->local_endpoint().port()<< std::endl<<
"\t\tServer Thread Protocol: "<< ptrCurSock->local_endpoint().protocol().protocol()<< std::endl;
std::cout<< "\tServer Thread Remote End Point:"<< std::endl<<
"\t\tServer Thread Address: "<< ptrCurSock->remote_endpoint().address().to_string()<< std::endl<<
"\t\tServer Thread Port: "<< ptrCurSock->remote_endpoint().port()<< std::endl<<
"\t\tServer Thread Protocol: "<< ptrCurSock->remote_endpoint().protocol().protocol()<< std::endl;
g_iCurClientNumber.fetch_add(1);
// Step 5.1 接受链入的Socket
// Step 5.2 设置Socket接收数据回调
std::shared_ptr<boost::asio::streambuf> ptrSockStreamBuff = std::shared_ptr<boost::asio::streambuf>(new boost::asio::streambuf());
boost::asio::async_read(*ptrCurSock, *ptrSockStreamBuff, boost::asio::transfer_exactly(sizeof(size_t)), boost::bind(
server_thread_recv_handler, ptrCurSock, ptrSockStreamBuff, _1
));
// Step 5.3 创建Socket, 接受新Socket
ptrCurSock = std::shared_ptr<MySocket>(
new MySocket(stMainService)
);
stAccepter.async_accept(*ptrCurSock, boost::bind(
server_thread_accept_handler, boost::ref(stMainService), boost::ref(stAccepter), boost::ref(ptrCurSock), _1
));
}
/**
* 服务器工作线程
* @param [in] stMainService 服务对象
*/
void server_thread(boost::asio::io_service& stMainService)
{
std::cout<< "==== Server Thread "<< std::this_thread::get_id()<< " Started"<< std::endl;
while(bRunning) {
try {
stMainService.run(); // 这样,就加到线程池中去了
} catch (boost::system::error_code& e) {
std::cout<< "Service Exception: "<< e.message()<< std::endl;
} catch (...) {
std::cout<< "Service Unknown Exception"<< std::endl;
}
}
}
/**
* 服务器主线程
* @param [in] server_num 服务线程数
*/
void server_main_thread(int server_num)
{
// Step 1. 创建服务对象
static boost::asio::io_service stMainService;
// Step 2. 创建地址生成器及生成地址
boost::asio::ip::tcp::resolver stResolver(stMainService);
// 其实第二个参数8731也可以写成http、ftp什么的,所以他这里用了字符串
boost::asio::ip::tcp::endpoint stEndpoint = *stResolver.resolve(boost::asio::ip::tcp::resolver::query("10.24.248.86", "8731"));
// Step 3. 创建接收器
boost::asio::ip::tcp::acceptor stAccepter(stMainService, stEndpoint);
// Step 4. 创建Socket
std::shared_ptr<MySocket> ptrCurSock = std::shared_ptr<MySocket>(
new MySocket(stMainService)
);
stAccepter.async_accept(*ptrCurSock, boost::bind(
server_thread_accept_handler, boost::ref(stMainService), boost::ref(stAccepter), boost::ref(ptrCurSock), _1
));
// Step 5. 创建线程并加入到服务线程池
std::vector<std::thread*> stls;
for (int i = 0; i < server_num; ++ i)
{
// 打印输出错开
std::this_thread::sleep_for(std::chrono::milliseconds(1));
stls.push_back(
new std::thread(server_thread, boost::ref(stMainService))
);
}
boost::asio::deadline_timer stDeadlineTimer(stMainService);
stDeadlineTimer.expires_from_now(boost::posix_time::seconds(1));
stDeadlineTimer.async_wait(boost::bind(deadline_timer_handler, boost::ref(stDeadlineTimer), _1));
// Ctrl + C退出服务
boost::asio::signal_set stSigs(stMainService);
stSigs.add(SIGINT);
stSigs.add(SIGTERM);
stSigs.async_wait(boost::bind(signal_handler, boost::ref(stMainService), _1, _2));
server_thread(stMainService);
// Step 6. 阻塞主线程,等待服务线程退出
for (size_t i = 0; i < stls.size(); ++ i)
{
stls[i]->join();
delete stls[i];
}
}
void print_asio_info();
int main(int argc, char* argv[]) {
print_asio_info();
g_iPkgPerSecond.store(0);
g_iCurClientNumber.store(0);
int iSvrThrNum = 16;
if (argc > 1)
iSvrThrNum = atoi(argv[1]);
server_main_thread(iSvrThrNum);
print_asio_info();
return 0;
}
void print_asio_info()
{
#if defined(BOOST_ASIO_HAS_IOCP)
std::cout<< "Boost.Asio using iocp"<< std::endl;
#if defined(BOOST_ASIO_HAS_SERIAL_PORT)
std::cout<< "\tSerial Port: enabled"<< std::endl;
#else
std::cout<< "\tSerial Port: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_WINDOWS_STREAM_HANDLE)
std::cout<< "\tStream handle: enabled"<< std::endl;
#else
std::cout<< "\tStream handle: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_WINDOWS_RANDOM_ACCESS_HANDLE)
std::cout<< "\tWindows Random Access Handle: enabled"<< std::endl;
#else
std::cout<< "\tWindows Random Access Handle: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_WINDOWS_OBJECT_HANDLE)
std::cout<< "\tObject Handle: enabled"<< std::endl;
#else
std::cout<< "\tObject Handle: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_WINDOWS_OVERLAPPED_PTR)
std::cout<< "\tWindows Overlapped Ptr: enabled"<< std::endl;
#else
std::cout<< "\tWindows Overlapped Ptr: disabled"<< std::endl;
#endif
#elif defined(BOOST_ASIO_HAS_EPOLL)
std::cout<< "Boost.Asio using epoll."<< std::endl;
#if defined(BOOST_ASIO_HAS_EVENTFD)
std::cout<< "\teventfd: enabled"<< std::endl;
#else
std::cout<< "\teventfd: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_TIMERFD)
std::cout<< "\timerfd: enabled"<< std::endl;
#else
std::cout<< "\timerfd: disabled"<< std::endl;
#endif
#elif defined(BOOST_ASIO_HAS_KQUEUE)
std::cout<< "Boost.Asio using kqueue"<< std::endl;
#elif defined(BOOST_ASIO_HAS_DEV_POLL)
std::cout<< "Boost.Asio using solaris: /dev/poll"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
std::cout<< "Posix Stream Descriptor: enabled"<< std::endl;
#else
std::cout<< "Posix Stream Descriptor: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
std::cout<< "Unix domain socket: enabled"<< std::endl;
#else
std::cout<< "Unix domain socket: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_SIGACTION)
std::cout<< "sigaction: enabled"<< std::endl;
#else
std::cout<< "sigaction: disabled"<< std::endl;
#endif
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment