Created
April 7, 2015 14:29
-
-
Save danieljoos/59c6867f7d316505dad3 to your computer and use it in GitHub Desktop.
libkafka-asio: Producer Example with Hash Partitioning
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <sstream> | |
#include <boost/asio.hpp> | |
#include <boost/bind.hpp> | |
#include <boost/function.hpp> | |
#include <boost/random.hpp> | |
#include <boost/ref.hpp> | |
#include <boost/shared_ptr.hpp> | |
#include <boost/thread.hpp> | |
#include <libkafka_asio/libkafka_asio.h> | |
using libkafka_asio::Client; | |
using libkafka_asio::ProduceRequest; | |
using libkafka_asio::ProduceResponse; | |
using libkafka_asio::MetadataRequest; | |
using libkafka_asio::MetadataResponse; | |
using libkafka_asio::String; | |
using libkafka_asio::Int32; | |
// | |
// We use the following helper to create promise-based handler functions. | |
// The handler functions will set the value of a promise object. | |
// On error, an exception will be thrown in the thread, waiting for the promise. | |
// | |
template< typename T > | |
struct PromiseHelper | |
{ | |
typedef boost::promise< typename T::ResponseType::OptionalType > PromiseType; | |
typedef boost::shared_ptr< PromiseType > SharedPromiseType; | |
typedef typename Client::Handler<T>::Type ResultType; | |
static ResultType Handler(SharedPromiseType pr) | |
{ | |
struct local | |
{ | |
static void Handle(const Client::ErrorCodeType& err, | |
const typename T::ResponseType::OptionalType& response, | |
SharedPromiseType result) | |
{ | |
if (err) | |
{ | |
result->set_exception( | |
boost::copy_exception(boost::system::system_error(err))); | |
return; | |
} | |
result->set_value(response); | |
} | |
}; | |
return boost::bind(local::Handle, ::_1, ::_2, pr); | |
} | |
}; | |
// | |
// Generates a random number in the given range. | |
// As this is an example, there is not much error handling here, so `min` must | |
// be smaller than `max`. | |
// | |
int RandomNumber(int min, int max) | |
{ | |
boost::random::mt19937 gen; | |
boost::random::uniform_int_distribution<> dist(min, max); | |
return dist(gen); | |
} | |
// | |
// This is our little producer class. | |
// | |
class TestProducer | |
{ | |
public: | |
TestProducer(Client& client) : | |
client_(client) | |
{ | |
} | |
// | |
// Produce the given amount of messages for the given topic. | |
// The implementation first fetches metadata from Kafka, to know the amount | |
// of partitions of the topic. It then produces `max` messages. | |
// | |
void Produce(const String& topic_name, int max) | |
{ | |
topic_ = topic_name; | |
try | |
{ | |
MetadataResponse metadata = GetMetadata(); | |
partition_count_ = metadata.topics()[0].partitions.size(); | |
for (int i = 0; i < max; ++i) | |
{ | |
ProduceMessage(i); | |
std::cout << "Produced message " << i << std::endl; | |
} | |
} | |
catch(std::exception& e) | |
{ | |
std::cerr << "Error: " << e.what() << std::endl; | |
} | |
} | |
private: | |
// Blocks until the metadata was received | |
MetadataResponse GetMetadata() | |
{ | |
typedef PromiseHelper<MetadataRequest> Helper; | |
Helper::SharedPromiseType promise(new Helper::PromiseType()); | |
MetadataRequest request; | |
request.AddTopicName(topic_); | |
client_.AsyncRequest(request, Helper::Handler(promise)); | |
return *(promise->get_future().get()); | |
} | |
// Blocks until the message was produced | |
void ProduceMessage(int i) | |
{ | |
typedef PromiseHelper<ProduceRequest> Helper; | |
Helper::SharedPromiseType promise(new Helper::PromiseType()); | |
ProduceRequest request; | |
request.set_required_acks(1); | |
std::stringstream ip; | |
ip << "192.168.2." << RandomNumber(0, 255); | |
std::stringstream val; | |
val << "Hello World " << i; | |
request.AddValue(val.str(), topic_, KeyedPartition(ip.str())); | |
client_.AsyncRequest(request, Helper::Handler(promise)); | |
promise->get_future().get(); | |
} | |
// This kind-of implements the `KeyedMessage` of Kafka: | |
// It returns the partition number for the given key string. The number is | |
// calculated using FNV-1a hash algorithm. | |
// The code for calculating the FNV-1a hash was taken from: | |
// http://www.boost.org/doc/libs/1_38_0/libs/unordered/examples/fnv1.hpp | |
Int32 KeyedPartition(const String& key) | |
{ | |
#ifdef _WIN64 | |
static const std::size_t fnv_prime = 1099511628211u; | |
static const std::size_t fnv_offset_basis = 14695981039346656037u; | |
#else | |
static const std::size_t fnv_prime = 16777619u; | |
static const std::size_t fnv_offset_basis = 2166136261u; | |
#endif // _WIN64 | |
std::size_t hash = fnv_offset_basis; | |
for(std::string::const_iterator it = key.begin(), end = key.end(); | |
it != end; ++it) | |
{ | |
hash ^= *it; | |
hash *= fnv_prime; | |
} | |
return hash % partition_count_; | |
} | |
Client& client_; | |
String topic_; | |
Int32 partition_count_; | |
}; | |
// | |
// Main | |
// | |
int main(int argc, char** argv) | |
{ | |
// All IO related work (and also all handler function) will be executed | |
// in a separate thread: | |
boost::asio::io_service ios; | |
boost::asio::io_service::work work(ios); | |
boost::thread worker(boost::bind(&boost::asio::io_service::run, &ios)); | |
// Create a client and a producer: | |
Client::Configuration configuration; | |
configuration.auto_connect = true; | |
configuration.socket_timeout = 10000; | |
configuration.AddBrokerFromString("192.168.2.60:9092"); | |
Client client(ios, configuration); | |
TestProducer producer(client); | |
// Produce 100 messages for topic 'page_visits': | |
producer.Produce("page_visits", 100); | |
// Teardown: | |
ios.stop(); | |
worker.join(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment