Skip to content

Instantly share code, notes, and snippets.

@chenshuo
Created September 4, 2010 07:13
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save chenshuo/564985 to your computer and use it in GitHub Desktop.
Save chenshuo/564985 to your computer and use it in GitHub Desktop.
ping pong test for muduo network library
#!/bin/sh
killall server
timeout=${timeout:-100}
bufsize=16384
for nosessions in 100 1000; do
for nothreads in 1 2 3 4; do
sleep 5
echo "Bufsize: $bufsize Threads: $nothreads Sessions: $nosessions"
./server 0.0.0.0 55555 $nothreads $bufsize & srvpid=$!
./client 127.0.0.1 55555 $nothreads $bufsize $nosessions $timeout
kill -9 $srvpid
done
done
// Benchmark inspired by libevent2/test/bench.c
// See also: http://libev.schmorp.de/bench.html
#include <muduo/base/Logging.h>
#include <muduo/base/Thread.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <boost/bind.hpp>
#include <stdio.h>
#include <sys/resource.h>
#include <sys/socket.h>
using namespace muduo;
using namespace muduo::net;
std::vector<int> g_pipes;
int numPipes;
int numActive;
int numWrites;
EventLoop* g_loop;
std::vector<Channel*> g_channels;
int g_reads, g_writes, g_fired;
void readCallback(Timestamp, int fd, int idx)
{
char ch;
g_reads += static_cast<int>(::recv(fd, &ch, sizeof(ch), 0));
if (g_writes > 0)
{
int widx = idx+1;
if (widx >= numPipes)
{
widx -= numPipes;
}
::send(g_pipes[2 * widx + 1], "m", 1, 0);
g_writes--;
g_fired++;
}
if (g_fired == g_reads)
{
g_loop->quit();
}
}
std::pair<int, int> runOnce()
{
Timestamp beforeInit(Timestamp::now());
for (int i = 0; i < numPipes; ++i)
{
Channel* channel = g_channels[i];
channel->setReadCallback(boost::bind(readCallback, _1, channel->fd(), i));
channel->enableReading();
}
int space = numPipes / numActive;
space *= 2;
for (int i = 0; i < numActive; ++i)
{
::send(g_pipes[i * space + 1], "m", 1, 0);
}
g_fired = numActive;
g_reads = 0;
g_writes = numWrites;
Timestamp beforeLoop(Timestamp::now());
g_loop->loop();
Timestamp end(Timestamp::now());
int iterTime = static_cast<int>(end.microSecondsSinceEpoch() - beforeInit.microSecondsSinceEpoch());
int loopTime = static_cast<int>(end.microSecondsSinceEpoch() - beforeLoop.microSecondsSinceEpoch());
return std::make_pair(iterTime, loopTime);
}
int main(int argc, char* argv[])
{
numPipes = 100;
numActive = 1;
numWrites = 100;
int c;
while ((c = getopt(argc, argv, "n:a:w:")) != -1)
{
switch (c)
{
case 'n':
numPipes = atoi(optarg);
break;
case 'a':
numActive = atoi(optarg);
break;
case 'w':
numWrites = atoi(optarg);
break;
default:
fprintf(stderr, "Illegal argument \"%c\"\n", c);
return 1;
}
}
struct rlimit rl;
rl.rlim_cur = rl.rlim_max = numPipes * 2 + 50;
if (::setrlimit(RLIMIT_NOFILE, &rl) == -1)
{
perror("setrlimit");
//return 1; // comment out this line if under valgrind
}
g_pipes.resize(2 * numPipes);
for (int i = 0; i < numPipes; ++i)
{
if (::socketpair(AF_UNIX, SOCK_STREAM, 0, &g_pipes[i*2]) == -1)
{
perror("pipe");
return 1;
}
}
EventLoop loop;
g_loop = &loop;
for (int i = 0; i < numPipes; ++i)
{
Channel* channel = new Channel(&loop, g_pipes[i*2]);
g_channels.push_back(channel);
}
for (int i = 0; i < 25; ++i)
{
std::pair<int, int> t = runOnce();
printf("%8d %8d\n", t.first, t.second);
}
for (int i = 0; i < numPipes; ++i)
{
delete g_channels[i];
}
}
#include <muduo/net/TcpClient.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Thread.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThreadPool.h>
#include <muduo/net/InetAddress.h>
#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <utility>
#include <mcheck.h>
#include <stdio.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
class Client;
class Session : boost::noncopyable
{
public:
Session(EventLoop* loop, const InetAddress& serverAddr, const string& name, Client* owner)
: client_(loop, serverAddr, name),
owner_(owner),
bytesRead_(0),
bytesWritten_(0),
messagesRead_(0)
{
client_.setConnectionCallback(
boost::bind(&Session::onConnection, this, _1));
client_.setMessageCallback(
boost::bind(&Session::onMessage, this, _1, _2, _3));
}
void start()
{
client_.connect();
}
void stop()
{
client_.disconnect();
}
int64_t bytesRead() const
{
return bytesRead_;
}
int64_t messagesRead() const
{
return messagesRead_;
}
private:
void onConnection(const TcpConnectionPtr& conn);
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time)
{
++messagesRead_;
bytesRead_ += buf->readableBytes();
bytesWritten_ += buf->readableBytes();
conn->send(buf);
}
TcpClient client_;
Client* owner_;
int64_t bytesRead_;
int64_t bytesWritten_;
int64_t messagesRead_;
};
class Client : boost::noncopyable
{
public:
Client(EventLoop* loop,
const InetAddress& serverAddr,
int blockSize,
int sessionCount,
int timeout,
int threadCount)
: loop_(loop),
threadPool_(loop),
sessionCount_(sessionCount),
timeout_(timeout)
{
loop->runAfter(timeout, boost::bind(&Client::handleTimeout, this));
if (threadCount > 1)
{
threadPool_.setThreadNum(threadCount);
}
threadPool_.start();
for (int i = 0; i < blockSize; ++i)
{
message_.push_back(static_cast<char>(i % 128));
}
for (int i = 0; i < sessionCount; ++i)
{
char buf[32];
snprintf(buf, sizeof buf, "C%05d", i);
Session* session = new Session(threadPool_.getNextLoop(), serverAddr, buf, this);
session->start();
sessions_.push_back(session);
}
}
const string& message() const
{
return message_;
}
void onConnect()
{
if (numConnected_.incrementAndGet() == sessionCount_)
{
LOG_WARN << "all connected";
}
}
void onDisconnect()
{
if (numConnected_.decrementAndGet() == 0)
{
LOG_WARN << "all disconnected";
int64_t totalBytesRead = 0;
int64_t totalMessagesRead = 0;
for (boost::ptr_vector<Session>::iterator it = sessions_.begin();
it != sessions_.end(); ++it)
{
totalBytesRead += it->bytesRead();
totalMessagesRead += it->messagesRead();
}
LOG_WARN << totalBytesRead << " total bytes read";
LOG_WARN << totalMessagesRead << " total messages read";
LOG_WARN << static_cast<double>(totalBytesRead) / static_cast<double>(totalMessagesRead)
<< " average message size";
LOG_WARN << static_cast<double>(totalBytesRead) / (timeout_ * 1024 * 1024)
<< " MiB/s throughput";
loop_->quit();
}
}
private:
void handleTimeout()
{
LOG_WARN << "stop";
std::for_each(sessions_.begin(), sessions_.end(),
boost::mem_fn(&Session::stop));
}
EventLoop* loop_;
EventLoopThreadPool threadPool_;
int sessionCount_;
int timeout_;
boost::ptr_vector<Session> sessions_;
string message_;
AtomicInt32 numConnected_;
};
void Session::onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
conn->setTcpNoDelay(true);
conn->send(owner_->message());
owner_->onConnect();
}
else
{
owner_->onDisconnect();
}
}
int main(int argc, char* argv[])
{
if (argc != 7)
{
fprintf(stderr, "Usage: client <host_ip> <port> <threads> <blocksize> ");
fprintf(stderr, "<sessions> <time>\n");
}
else
{
LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
Logger::setLogLevel(Logger::WARN);
const char* ip = argv[1];
uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
int threadCount = atoi(argv[3]);
int blockSize = atoi(argv[4]);
int sessionCount = atoi(argv[5]);
int timeout = atoi(argv[6]);
EventLoop loop;
InetAddress serverAddr(ip, port);
Client client(&loop, serverAddr, blockSize, sessionCount, timeout, threadCount);
loop.loop();
}
}
#include <muduo/net/TcpServer.h>
#include <muduo/base/Atomic.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Thread.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <boost/bind.hpp>
#include <utility>
#include <mcheck.h>
#include <stdio.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
void onConnection(const TcpConnectionPtr& conn)
{
conn->setTcpNoDelay(true);
}
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{
conn->send(buf);
}
int main(int argc, char* argv[])
{
if (argc < 4)
{
fprintf(stderr, "Usage: server <address> <port> <threads>\n");
}
else
{
LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
Logger::setLogLevel(Logger::WARN);
const char* ip = argv[1];
uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
InetAddress listenAddr(ip, port);
int threadCount = atoi(argv[3]);
EventLoop loop;
TcpServer server(&loop, listenAddr, "PingPong");
server.setConnectionCallback(onConnection);
server.setMessageCallback(onMessage);
if (threadCount > 1)
{
server.setThreadNum(threadCount);
}
server.start();
loop.loop();
}
}
#!/bin/sh
killall server
timeout=${timeout:-100}
bufsize=16384
nothreads=1
for nosessions in 1 10 100 1000 10000; do
sleep 5
echo "Bufsize: $bufsize Threads: $nothreads Sessions: $nosessions"
taskset -c 1 ./server 0.0.0.0 55555 $nothreads $bufsize & srvpid=$!
taskset -c 2 ./client 127.0.0.1 55555 $nothreads $bufsize $nosessions $timeout
kill -9 $srvpid
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment