Skip to content

Instantly share code, notes, and snippets.

@terrancesnyder
Created June 5, 2013 20:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save terrancesnyder/5717067 to your computer and use it in GitHub Desktop.
Save terrancesnyder/5717067 to your computer and use it in GitHub Desktop.
kafka producers
public void testKafka() throws Exception {
Properties props = new Properties();
props.put("zk.connect", "zookeeper.cloudfront.io:2181");
props.put("zk.sessiontimeout.ms", "300000");
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("compression.codec", "1"); // gzip
props.put("producer.type", "async");
props.put("batch.size", "248");
props.put("queue.enqueueTimeout.ms", "-1");
props.put("buffer.size", String.valueOf(64*1024));
ProducerConfig config = new ProducerConfig(props);
Producer<String, Message> producer = new Producer<String, Message>(config);
Stopwatch s = new Stopwatch();
s.start();
for (int i=0; i<10; i++) {
ProducerData<String, Message> data = new ProducerData<String, Message>("mytopic", new Message("Hello World!".getBytes()));
producer.send(data);
}
s.stop();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment