Skip to content

Instantly share code, notes, and snippets.

@danieljoos
Created April 7, 2015 14:29
Show Gist options
  • Save danieljoos/59c6867f7d316505dad3 to your computer and use it in GitHub Desktop.
Save danieljoos/59c6867f7d316505dad3 to your computer and use it in GitHub Desktop.
libkafka-asio: Producer Example with Hash Partitioning
#include <iostream>
#include <sstream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/random.hpp>
#include <boost/ref.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <libkafka_asio/libkafka_asio.h>
using libkafka_asio::Client;
using libkafka_asio::ProduceRequest;
using libkafka_asio::ProduceResponse;
using libkafka_asio::MetadataRequest;
using libkafka_asio::MetadataResponse;
using libkafka_asio::String;
using libkafka_asio::Int32;
//
// We use the following helper to create promise-based handler functions.
// The handler functions will set the value of a promise object.
// On error, an exception will be thrown in the thread, waiting for the promise.
//
template< typename T >
struct PromiseHelper
{
typedef boost::promise< typename T::ResponseType::OptionalType > PromiseType;
typedef boost::shared_ptr< PromiseType > SharedPromiseType;
typedef typename Client::Handler<T>::Type ResultType;
static ResultType Handler(SharedPromiseType pr)
{
struct local
{
static void Handle(const Client::ErrorCodeType& err,
const typename T::ResponseType::OptionalType& response,
SharedPromiseType result)
{
if (err)
{
result->set_exception(
boost::copy_exception(boost::system::system_error(err)));
return;
}
result->set_value(response);
}
};
return boost::bind(local::Handle, ::_1, ::_2, pr);
}
};
//
// Generates a random number in the given range.
// As this is an example, there is not much error handling here, so `min` must
// be smaller than `max`.
//
int RandomNumber(int min, int max)
{
boost::random::mt19937 gen;
boost::random::uniform_int_distribution<> dist(min, max);
return dist(gen);
}
//
// This is our little producer class.
//
class TestProducer
{
public:
TestProducer(Client& client) :
client_(client)
{
}
//
// Produce the given amount of messages for the given topic.
// The implementation first fetches metadata from Kafka, to know the amount
// of partitions of the topic. It then produces `max` messages.
//
void Produce(const String& topic_name, int max)
{
topic_ = topic_name;
try
{
MetadataResponse metadata = GetMetadata();
partition_count_ = metadata.topics()[0].partitions.size();
for (int i = 0; i < max; ++i)
{
ProduceMessage(i);
std::cout << "Produced message " << i << std::endl;
}
}
catch(std::exception& e)
{
std::cerr << "Error: " << e.what() << std::endl;
}
}
private:
// Blocks until the metadata was received
MetadataResponse GetMetadata()
{
typedef PromiseHelper<MetadataRequest> Helper;
Helper::SharedPromiseType promise(new Helper::PromiseType());
MetadataRequest request;
request.AddTopicName(topic_);
client_.AsyncRequest(request, Helper::Handler(promise));
return *(promise->get_future().get());
}
// Blocks until the message was produced
void ProduceMessage(int i)
{
typedef PromiseHelper<ProduceRequest> Helper;
Helper::SharedPromiseType promise(new Helper::PromiseType());
ProduceRequest request;
request.set_required_acks(1);
std::stringstream ip;
ip << "192.168.2." << RandomNumber(0, 255);
std::stringstream val;
val << "Hello World " << i;
request.AddValue(val.str(), topic_, KeyedPartition(ip.str()));
client_.AsyncRequest(request, Helper::Handler(promise));
promise->get_future().get();
}
// This kind-of implements the `KeyedMessage` of Kafka:
// It returns the partition number for the given key string. The number is
// calculated using FNV-1a hash algorithm.
// The code for calculating the FNV-1a hash was taken from:
// http://www.boost.org/doc/libs/1_38_0/libs/unordered/examples/fnv1.hpp
Int32 KeyedPartition(const String& key)
{
#ifdef _WIN64
static const std::size_t fnv_prime = 1099511628211u;
static const std::size_t fnv_offset_basis = 14695981039346656037u;
#else
static const std::size_t fnv_prime = 16777619u;
static const std::size_t fnv_offset_basis = 2166136261u;
#endif // _WIN64
std::size_t hash = fnv_offset_basis;
for(std::string::const_iterator it = key.begin(), end = key.end();
it != end; ++it)
{
hash ^= *it;
hash *= fnv_prime;
}
return hash % partition_count_;
}
Client& client_;
String topic_;
Int32 partition_count_;
};
//
// Main
//
int main(int argc, char** argv)
{
// All IO related work (and also all handler function) will be executed
// in a separate thread:
boost::asio::io_service ios;
boost::asio::io_service::work work(ios);
boost::thread worker(boost::bind(&boost::asio::io_service::run, &ios));
// Create a client and a producer:
Client::Configuration configuration;
configuration.auto_connect = true;
configuration.socket_timeout = 10000;
configuration.AddBrokerFromString("192.168.2.60:9092");
Client client(ios, configuration);
TestProducer producer(client);
// Produce 100 messages for topic 'page_visits':
producer.Produce("page_visits", 100);
// Teardown:
ios.stop();
worker.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment