Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
sample kafka async producer demonstrating callback
import com.google.common.io.Resources;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* Created by rasingh on 10/2/16.
*/
public class MyProducerWithCallBack {
final static Logger logger = Logger.getLogger(MyProducer.class);
public static void main(String[] args) throws IOException {
if(args.length<1){
System.out.println("usage : java -cp <>.jar MyProducer <topic>");
System.exit(-1);
}
KafkaProducer<String, String> producer;
try (InputStream props = Resources.getResource("producer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
producer = new KafkaProducer<>(properties);
}
try {
int i=0;
while(true) {
//ProducerRecord<String, String> record = new ProducerRecord<>(args[0], "value-" + i);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(args[0],"key-"+i,"value-"+i);
producer.send(record,new ProducerCallBack());
i++;
//Thread.sleep(30000);
}
} catch (Throwable throwable) {
System.out.printf("%s", throwable.getStackTrace());
} finally {
producer.close();
}
}
private static class ProducerCallBack implements org.apache.kafka.clients.producer.Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception !=null){
System.out.println(metadata.topic()+metadata.offset()+metadata.partition());
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment