Last active
August 29, 2015 14:25
-
-
Save danieljoos/45e5e8c5146504d16754 to your computer and use it in GitHub Desktop.
Simple performance measurement of produce requests
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <boost/asio.hpp> | |
#include <boost/chrono.hpp> | |
#include <boost/random.hpp> | |
#include <libkafka_asio/libkafka_asio.h> | |
using libkafka_asio::Client; | |
using libkafka_asio::ProduceRequest; | |
using libkafka_asio::ProduceResponse; | |
using libkafka_asio::Bytes; | |
using libkafka_asio::MessageSet; | |
using libkafka_asio::CompressMessageSet; | |
using libkafka_asio::constants::kCompressionGZIP; | |
using boost::chrono::high_resolution_clock; | |
using boost::chrono::duration_cast; | |
using boost::chrono::microseconds; | |
// | |
// Adds the plain messages to the request | |
// | |
template< typename Traits > | |
struct DefaultRequestGenerator | |
{ | |
static void Generate(ProduceRequest& request, Bytes& data) | |
{ | |
for (int i = 0; i < Traits::kMessagesPerRequest; ++i) | |
{ | |
request.AddValue(data, Traits::kTopicName(), Traits::kTopicPartition); | |
} | |
} | |
}; | |
// | |
// Adds a compressed message set to the request | |
// | |
template< typename Traits > | |
struct CompressedRequestGenerator | |
{ | |
static void Generate(ProduceRequest& request, Bytes& data) | |
{ | |
MessageSet msg_set(Traits::kMessagesPerRequest); | |
for (MessageSet::iterator iter = msg_set.begin(); iter != msg_set.end(); ++iter) | |
{ | |
iter->mutable_value() = data; | |
} | |
boost::system::error_code ec; | |
request.AddMessage( | |
CompressMessageSet(msg_set, kCompressionGZIP, ec), | |
Traits::kTopicName(), | |
Traits::kTopicPartition); | |
} | |
}; | |
// | |
// Some test parameters | |
// | |
struct Traits | |
{ | |
static const int kMaxRequests = 1000; | |
static const int kMessagesPerRequest = 1000; | |
static const size_t kTestDataLength = 100; | |
static const char* kTopicName() { return "test3"; } | |
static const int kTopicPartition = 0; | |
static const char* kBroker() { return "localhost:9092"; } | |
static const int kRequiredAcks = 0; | |
typedef DefaultRequestGenerator<Traits> RequestGeneratorType; | |
}; | |
// | |
// The test producer | |
// | |
class Test | |
{ | |
typedef boost::scoped_ptr<Client> ScopedClientType; | |
typedef boost::asio::io_service IoServiceType; | |
public: | |
Test() | |
{ | |
// Create some random ASCII test data | |
boost::random::mt19937 gen; | |
boost::random::uniform_int_distribution<> ascii_dist(32, 126); | |
test_data_.reset(new Bytes::element_type(Traits::kTestDataLength, 'x')); | |
for (Bytes::element_type::iterator iter = test_data_->begin(); | |
iter != test_data_->end(); | |
++iter) | |
{ | |
*iter = ascii_dist(gen); | |
} | |
} | |
// | |
// Opens a connection to the broker and runs until all requests have been processed | |
// | |
void Run() | |
{ | |
Client::Configuration configuration; | |
configuration.auto_connect = true; | |
configuration.AddBrokerFromString(Traits::kBroker()); | |
client_.reset(new Client(io_service_, configuration)); | |
request_count_ = 0; | |
PrepareNextRequest(); | |
io_service_.run(); | |
} | |
long request_count() const | |
{ | |
return request_count_; | |
} | |
private: | |
// | |
// Creates a new ProduceRequest, fills it with data and schedules sending it to the broker | |
// | |
void PrepareNextRequest() | |
{ | |
if (request_count_ >= Traits::kMaxRequests) | |
{ | |
client_->Close(); | |
return; | |
} | |
ProduceRequest request; | |
request.set_required_acks(Traits::kRequiredAcks); | |
Traits::RequestGeneratorType::Generate(request, test_data_); | |
client_->AsyncRequest(request, boost::bind(&Test::HandleRequest, this, ::_1, ::_2)); | |
} | |
// | |
// Handles the produce request and triggers the creation of the next request | |
// | |
void HandleRequest( | |
const Client::ErrorCodeType& err, | |
const ProduceResponse::OptionalType& response) | |
{ | |
if (err) | |
{ | |
std::cerr << "Error: " << boost::system::system_error(err).what() << std::endl; | |
return; | |
} | |
++request_count_; | |
PrepareNextRequest(); | |
} | |
IoServiceType io_service_; | |
ScopedClientType client_; | |
long request_count_; | |
Bytes test_data_; | |
microseconds time_waited_; | |
high_resolution_clock::time_point start_time_point_; | |
}; | |
int main(int argc, char **argv) | |
{ | |
// Create the test runner | |
Test test; | |
// Execute the test and measure the execution time | |
high_resolution_clock::time_point time_start = high_resolution_clock::now(); | |
test.Run(); | |
high_resolution_clock::time_point time_end = high_resolution_clock::now(); | |
// | |
// Show some output on the console | |
// | |
microseconds duration = duration_cast<microseconds>(time_end - time_start); | |
double requests_per_second = (double)test.request_count() * 1000000. / (double)duration.count(); | |
double messages_per_second = requests_per_second * (double)Traits::kMessagesPerRequest; | |
double mb_per_second = messages_per_second * (double)Traits::kTestDataLength / (1024. * 1024.); | |
double ms_per_request = (double)duration.count() / 1000. / (double)test.request_count(); | |
std::cout | |
<< "Executed " << test.request_count() << " produce requests" << std::endl | |
<< "Produced " << test.request_count() * Traits::kMessagesPerRequest << " messages" << std::endl | |
<< "Took " << duration << " / " << duration.count() / 1000000. << "s" << std::endl | |
<< requests_per_second << " requests per second" << std::endl | |
<< messages_per_second << " messages per second" << std::endl | |
<< mb_per_second << " MB per second" << std::endl | |
<< ms_per_request << " ms per request" << std::endl; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment