Skip to content

Instantly share code, notes, and snippets.

@kzemek
Created August 14, 2015 17:08
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save kzemek/166e2af5f799d4f833a3 to your computer and use it in GitHub Desktop.
#include <asio.hpp>
#include <asio/ssl.hpp>
#include <algorithm>
#include <atomic>
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <memory>
#include <thread>
#include <vector>
class IoServices {
public:
IoServices(std::size_t number)
: m_ioServices(number)
{
for (auto &ioService : m_ioServices) {
m_idleWorks.emplace_back(ioService);
m_threads.emplace_back([&] { ioService.run(); });
}
}
void stop()
{
for (auto &ioService : m_ioServices)
ioService.stop();
for (auto &thread : m_threads)
if (thread.joinable())
thread.join();
}
~IoServices() { stop(); }
asio::io_service &get()
{
return m_ioServices[(m_nextService++ % m_ioServices.size())];
}
private:
std::atomic<std::size_t> m_nextService{0};
std::vector<asio::io_service> m_ioServices;
std::vector<asio::io_service::work> m_idleWorks;
std::vector<std::thread> m_threads;
};
class ServerConnection {
public:
ServerConnection(asio::io_service &ioService, asio::ssl::context &context,
std::size_t messageSize)
: m_socket{ioService, context}
, m_buffer(messageSize)
{
++s_runningConnections;
}
~ServerConnection() { --s_runningConnections; }
asio::ssl::stream<asio::ip::tcp::socket>::lowest_layer_type &socket()
{
return m_socket.lowest_layer();
}
void start(std::shared_ptr<ServerConnection> self, std::size_t messages)
{
m_socket.async_handshake(asio::ssl::stream_base::server,
[=](const asio::error_code &) { asyncRead(self, messages); });
}
static std::size_t runningConnections() { return s_runningConnections; }
private:
void asyncRead(std::shared_ptr<ServerConnection> self, std::size_t messages)
{
asio::async_read(m_socket, asio::buffer(m_buffer),
[=](const asio::error_code &, std::size_t) {
if (messages > 1)
asyncRead(self, messages - 1);
});
}
static std::atomic<std::size_t> s_runningConnections;
asio::ssl::stream<asio::ip::tcp::socket> m_socket;
std::vector<char> m_buffer;
};
std::atomic<std::size_t> ServerConnection::s_runningConnections{0};
class Server {
public:
Server(IoServices &ioServices, std::size_t connections,
std::size_t messages, std::size_t messageSize)
: m_ioServices{ioServices}
, m_messages{messages}
, m_messageSize{messageSize}
{
m_context.use_certificate_chain_file("server.pem");
m_context.use_private_key_file("server.key", asio::ssl::context::pem);
asyncAccept(connections);
}
private:
void asyncAccept(std::size_t connections)
{
auto conn = std::make_shared<ServerConnection>(
m_ioServices.get(), m_context, m_messageSize);
m_acceptor.async_accept(conn->socket(), [=](const asio::error_code &) {
conn->start(conn, m_messages);
if (connections > 1)
asyncAccept(connections - 1);
});
}
IoServices &m_ioServices;
std::size_t m_messages;
std::size_t m_messageSize;
asio::ssl::context m_context{asio::ssl::context::tlsv12_server};
asio::ip::tcp::acceptor m_acceptor{
m_ioServices.get(), {asio::ip::tcp::v4(), 5555}};
};
class ClientConnection {
public:
ClientConnection(asio::io_service &ioService,
asio::ip::tcp::resolver::iterator iterator, std::size_t messageSize)
: m_socket{ioService, m_context}
, m_buffer(messageSize)
{
asio::connect(m_socket.lowest_layer(), iterator);
m_socket.handshake(asio::ssl::stream_base::client);
}
void asyncSend(std::size_t messages)
{
asio::async_write(m_socket, asio::buffer(m_buffer),
[=](const asio::error_code &, std::size_t) {
if (messages > 1)
asyncSend(messages - 1);
});
}
private:
asio::ssl::context m_context{asio::ssl::context::tlsv12_client};
asio::ssl::stream<asio::ip::tcp::socket> m_socket;
std::vector<char> m_buffer;
};
std::vector<std::shared_ptr<ClientConnection>> createClients(
IoServices &ioServices, std::size_t messageSize, std::size_t number)
{
asio::ip::tcp::resolver resolver{ioServices.get()};
auto iterator = resolver.resolve({"127.0.0.1", "5555"});
std::vector<std::shared_ptr<ClientConnection>> clients;
std::generate_n(std::back_inserter(clients), number, [&] {
return std::make_shared<ClientConnection>(
ioServices.get(), iterator, messageSize);
});
return clients;
}
std::chrono::milliseconds measureTransferTime(
std::vector<std::shared_ptr<ClientConnection>> &clients,
std::size_t messages)
{
auto startTime = std::chrono::steady_clock::now();
for (auto &client : clients)
client->asyncSend(messages);
while (ServerConnection::runningConnections() != 0)
std::this_thread::sleep_for(std::chrono::milliseconds{10});
auto stopTime = std::chrono::steady_clock::now();
return std::chrono::duration_cast<std::chrono::milliseconds>(
stopTime - startTime);
}
int main(int argc, char *argv[])
{
if (argc < 5) {
std::cout << "Usage: " << argv[0]
<< " threads connections messages messageSize" << std::endl;
return 1;
}
std::size_t threadsNo = std::atoll(argv[1]);
std::size_t connections = std::atoll(argv[2]);
std::size_t messages = std::atoll(argv[3]);
std::size_t messageSize = std::atoll(argv[4]);
IoServices ioServices(threadsNo);
Server server{ioServices, connections, messages, messageSize};
auto clients = createClients(ioServices, messageSize, connections);
auto duration = measureTransferTime(clients, messages);
auto seconds = static_cast<double>(duration.count()) / 1000;
auto megabytes =
static_cast<double>(connections * messages * messageSize) / 1024 / 1024;
std::cout << megabytes << " megabytes sent and received in " << seconds
<< " seconds. (" << (megabytes / seconds) << " MB/s)"
<< std::endl;
ioServices.stop();
return 0;
}
--- benchmark_naive.cpp 2015-08-14 18:48:32.000000000 +0200
+++ benchmark_asio.cpp 2015-08-14 19:04:52.000000000 +0200
@@ -10,6 +10,41 @@
#include <thread>
#include <vector>
+class IoServices {
+public:
+ IoServices(std::size_t number)
+ : m_ioServices(number)
+ {
+ for (auto &ioService : m_ioServices) {
+ m_idleWorks.emplace_back(ioService);
+ m_threads.emplace_back([&] { ioService.run(); });
+ }
+ }
+
+ void stop()
+ {
+ for (auto &ioService : m_ioServices)
+ ioService.stop();
+
+ for (auto &thread : m_threads)
+ if (thread.joinable())
+ thread.join();
+ }
+
+ ~IoServices() { stop(); }
+
+ asio::io_service &get()
+ {
+ return m_ioServices[(m_nextService++ % m_ioServices.size())];
+ }
+
+private:
+ std::atomic<std::size_t> m_nextService{0};
+ std::vector<asio::io_service> m_ioServices;
+ std::vector<asio::io_service::work> m_idleWorks;
+ std::vector<std::thread> m_threads;
+};
+
class ServerConnection {
public:
ServerConnection(asio::io_service &ioService, asio::ssl::context &context,
@@ -54,9 +89,9 @@
class Server {
public:
- Server(asio::io_service &ioService, std::size_t connections,
+ Server(IoServices &ioServices, std::size_t connections,
std::size_t messages, std::size_t messageSize)
- : m_ioService{ioService}
+ : m_ioServices{ioServices}
, m_messages{messages}
, m_messageSize{messageSize}
{
@@ -69,7 +104,7 @@
void asyncAccept(std::size_t connections)
{
auto conn = std::make_shared<ServerConnection>(
- m_ioService, m_context, m_messageSize);
+ m_ioServices.get(), m_context, m_messageSize);
m_acceptor.async_accept(conn->socket(), [=](const asio::error_code &) {
conn->start(conn, m_messages);
@@ -78,13 +113,13 @@
});
}
- asio::io_service &m_ioService;
+ IoServices &m_ioServices;
std::size_t m_messages;
std::size_t m_messageSize;
asio::ssl::context m_context{asio::ssl::context::tlsv12_server};
asio::ip::tcp::acceptor m_acceptor{
- m_ioService, {asio::ip::tcp::v4(), 5555}};
+ m_ioServices.get(), {asio::ip::tcp::v4(), 5555}};
};
class ClientConnection {
@@ -113,26 +148,16 @@
std::vector<char> m_buffer;
};
-std::vector<std::thread> createThreads(
- asio::io_service &ioService, std::size_t number)
-{
- std::vector<std::thread> threads;
- std::generate_n(std::back_inserter(threads), number,
- [&] { return std::thread{[&] { ioService.run(); }}; });
-
- return threads;
-}
-
std::vector<std::shared_ptr<ClientConnection>> createClients(
- asio::io_service &ioService, std::size_t messageSize, std::size_t number)
+ IoServices &ioServices, std::size_t messageSize, std::size_t number)
{
- asio::ip::tcp::resolver resolver{ioService};
+ asio::ip::tcp::resolver resolver{ioServices.get()};
auto iterator = resolver.resolve({"127.0.0.1", "5555"});
std::vector<std::shared_ptr<ClientConnection>> clients;
std::generate_n(std::back_inserter(clients), number, [&] {
return std::make_shared<ClientConnection>(
- ioService, iterator, messageSize);
+ ioServices.get(), iterator, messageSize);
});
return clients;
@@ -170,12 +195,9 @@
std::size_t messages = std::atoll(argv[3]);
std::size_t messageSize = std::atoll(argv[4]);
- asio::io_service ioService;
- asio::io_service::work idleWork{ioService};
-
- auto threads = createThreads(ioService, threadsNo);
- Server server{ioService, connections, messages, messageSize};
- auto clients = createClients(ioService, messageSize, connections);
+ IoServices ioServices(threadsNo);
+ Server server{ioServices, connections, messages, messageSize};
+ auto clients = createClients(ioServices, messageSize, connections);
auto duration = measureTransferTime(clients, messages);
auto seconds = static_cast<double>(duration.count()) / 1000;
@@ -186,9 +208,7 @@
<< " seconds. (" << (megabytes / seconds) << " MB/s)"
<< std::endl;
- ioService.stop();
- for (auto &thread : threads)
- thread.join();
+ ioServices.stop();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment