Skip to content

Instantly share code, notes, and snippets.

@emaxerrno
Created June 10, 2016 21:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save emaxerrno/f6bc4c60b99d5001aaa64d101025e11b to your computer and use it in GitHub Desktop.
Save emaxerrno/f6bc4c60b99d5001aaa64d101025e11b to your computer and use it in GitHub Desktop.
crash.cc
#include <iostream>
#include <fstream>
#include <memory>
#include <string>
#include <vector>
#include <unistd.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <librdkafka/rdkafkacpp.h>
#include <thread>
static uint64_t count = 0;
static uint64_t bytesSent = 0;
DEFINE_string(file_name, "", "Name of file to produce");
DEFINE_string(topic_name, "", "Name of topic to produce onto");
DEFINE_string(broker_addr, "", "Address of kafka broker");
void produce_line(std::string &line,
RdKafka::Producer *producer,
RdKafka::Topic *topic) {
std::string key = line.size() < 24 ? line : line.substr(0, 24);
// 1. topic
// 2. partition
// 3. flags
// 4. payload
// 5. payload len
// 6. std::string key
// 7. msg_opaque?
RdKafka::ErrorCode resp;
while((resp = producer->produce(
topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(line.c_str()), line.size(), &key, NULL))
&& resp != RdKafka::ERR_NO_ERROR) {
LOG(ERROR) << "Issue when producing: " << RdKafka::err2str(resp)
<< " .. attempting again";
}
bytesSent += line.size();
if(++count % 1000000 == 0) {
producer->poll(5);
LOG(INFO) << "Total lines sent: " << count;
}
}
int main(int argc, char **argv) {
google::SetUsageMessage("KafkaFill\n"
"Usage:\n"
"\t./kafka_fill\t--file_name fn \t--topic_name tn "
"\t--broker_addr localhost:9092 \t --limit_gb 1 \\\n"
"\n");
google::ParseCommandLineFlags(&argc, &argv, true);
std::string err;
auto *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
auto *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
auto status = conf->set("metadata.broker.list", FLAGS_broker_addr, err);
if(status != RdKafka::Conf::CONF_OK) {
throw std::runtime_error(err);
}
// Default value: 100000
status = conf->set("queue.buffering.max.messages", "10000000", err);
if(status != RdKafka::Conf::CONF_OK) {
throw std::runtime_error(err);
}
auto *producer = RdKafka::Producer::create(conf, err);
if(!producer) {
throw std::runtime_error(err);
}
auto *topic = RdKafka::Topic::create(producer, FLAGS_topic_name, tconf, err);
if(!topic) {
throw std::runtime_error(err);
}
std::ifstream infile(FLAGS_file_name);
std::string line;
// const uint64_t bytesLimit = FLAGS_limit_gb * 1000000000;
while(std::getline(infile, line)) {
produce_line(line, producer, topic);
}
int outq;
while((outq = producer->outq_len()) > 0) {
LOG(INFO) << "Waiting to drain queue: " << outq;
producer_->poll(5000);
}
delete topic;
delete producer;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment