Skip to content

Instantly share code, notes, and snippets.

@danieljoos
Last active August 29, 2015 14:25
Show Gist options
  • Save danieljoos/45e5e8c5146504d16754 to your computer and use it in GitHub Desktop.
Save danieljoos/45e5e8c5146504d16754 to your computer and use it in GitHub Desktop.
Simple performance measurement of produce requests
#include <iostream>
#include <boost/asio.hpp>
#include <boost/chrono.hpp>
#include <boost/random.hpp>
#include <libkafka_asio/libkafka_asio.h>
using libkafka_asio::Client;
using libkafka_asio::ProduceRequest;
using libkafka_asio::ProduceResponse;
using libkafka_asio::Bytes;
using libkafka_asio::MessageSet;
using libkafka_asio::CompressMessageSet;
using libkafka_asio::constants::kCompressionGZIP;
using boost::chrono::high_resolution_clock;
using boost::chrono::duration_cast;
using boost::chrono::microseconds;
//
// Adds the plain messages to the request
//
template< typename Traits >
struct DefaultRequestGenerator
{
static void Generate(ProduceRequest& request, Bytes& data)
{
for (int i = 0; i < Traits::kMessagesPerRequest; ++i)
{
request.AddValue(data, Traits::kTopicName(), Traits::kTopicPartition);
}
}
};
//
// Adds a compressed message set to the request
//
template< typename Traits >
struct CompressedRequestGenerator
{
static void Generate(ProduceRequest& request, Bytes& data)
{
MessageSet msg_set(Traits::kMessagesPerRequest);
for (MessageSet::iterator iter = msg_set.begin(); iter != msg_set.end(); ++iter)
{
iter->mutable_value() = data;
}
boost::system::error_code ec;
request.AddMessage(
CompressMessageSet(msg_set, kCompressionGZIP, ec),
Traits::kTopicName(),
Traits::kTopicPartition);
}
};
//
// Some test parameters
//
struct Traits
{
static const int kMaxRequests = 1000;
static const int kMessagesPerRequest = 1000;
static const size_t kTestDataLength = 100;
static const char* kTopicName() { return "test3"; }
static const int kTopicPartition = 0;
static const char* kBroker() { return "localhost:9092"; }
static const int kRequiredAcks = 0;
typedef DefaultRequestGenerator<Traits> RequestGeneratorType;
};
//
// The test producer
//
class Test
{
typedef boost::scoped_ptr<Client> ScopedClientType;
typedef boost::asio::io_service IoServiceType;
public:
Test()
{
// Create some random ASCII test data
boost::random::mt19937 gen;
boost::random::uniform_int_distribution<> ascii_dist(32, 126);
test_data_.reset(new Bytes::element_type(Traits::kTestDataLength, 'x'));
for (Bytes::element_type::iterator iter = test_data_->begin();
iter != test_data_->end();
++iter)
{
*iter = ascii_dist(gen);
}
}
//
// Opens a connection to the broker and runs until all requests have been processed
//
void Run()
{
Client::Configuration configuration;
configuration.auto_connect = true;
configuration.AddBrokerFromString(Traits::kBroker());
client_.reset(new Client(io_service_, configuration));
request_count_ = 0;
PrepareNextRequest();
io_service_.run();
}
long request_count() const
{
return request_count_;
}
private:
//
// Creates a new ProduceRequest, fills it with data and schedules sending it to the broker
//
void PrepareNextRequest()
{
if (request_count_ >= Traits::kMaxRequests)
{
client_->Close();
return;
}
ProduceRequest request;
request.set_required_acks(Traits::kRequiredAcks);
Traits::RequestGeneratorType::Generate(request, test_data_);
client_->AsyncRequest(request, boost::bind(&Test::HandleRequest, this, ::_1, ::_2));
}
//
// Handles the produce request and triggers the creation of the next request
//
void HandleRequest(
const Client::ErrorCodeType& err,
const ProduceResponse::OptionalType& response)
{
if (err)
{
std::cerr << "Error: " << boost::system::system_error(err).what() << std::endl;
return;
}
++request_count_;
PrepareNextRequest();
}
IoServiceType io_service_;
ScopedClientType client_;
long request_count_;
Bytes test_data_;
microseconds time_waited_;
high_resolution_clock::time_point start_time_point_;
};
int main(int argc, char **argv)
{
// Create the test runner
Test test;
// Execute the test and measure the execution time
high_resolution_clock::time_point time_start = high_resolution_clock::now();
test.Run();
high_resolution_clock::time_point time_end = high_resolution_clock::now();
//
// Show some output on the console
//
microseconds duration = duration_cast<microseconds>(time_end - time_start);
double requests_per_second = (double)test.request_count() * 1000000. / (double)duration.count();
double messages_per_second = requests_per_second * (double)Traits::kMessagesPerRequest;
double mb_per_second = messages_per_second * (double)Traits::kTestDataLength / (1024. * 1024.);
double ms_per_request = (double)duration.count() / 1000. / (double)test.request_count();
std::cout
<< "Executed " << test.request_count() << " produce requests" << std::endl
<< "Produced " << test.request_count() * Traits::kMessagesPerRequest << " messages" << std::endl
<< "Took " << duration << " / " << duration.count() / 1000000. << "s" << std::endl
<< requests_per_second << " requests per second" << std::endl
<< messages_per_second << " messages per second" << std::endl
<< mb_per_second << " MB per second" << std::endl
<< ms_per_request << " ms per request" << std::endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment