Skip to content

Instantly share code, notes, and snippets.

@michiel
Forked from terrancesnyder/kafka-consumer.java
Created October 11, 2015 17:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michiel/8dd57f581f7a25108d3b to your computer and use it in GitHub Desktop.
Save michiel/8dd57f581f7a25108d3b to your computer and use it in GitHub Desktop.
Simple example of publishing avro messages to Kafka.
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.ByteBufferInputStream;
import org.apache.commons.io.IOUtils;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import kafka.producer.ProducerConfig;
public class KafkaAvroPublisherExample {
private final Producer<String, Message> kafkaProducer;
private static final SpecificDatumWriter<Event> avroEventWriter = new SpecificDatumWriter<Event>(Event.SCHEMA$);
private static final EncoderFactory avroEncoderFactory = EncoderFactory.get();
public KafkaAvroPublisherExample(MyKafaSettings settings) {
Properties props = new Properties();
props.put("zk.connect", settings.zookeeper);
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("compression.codec", settings.compression); // snappy
props.put("message.send.max.retries", settings.maxRetry);
props.put("batch.num.messages", settings.batchSize);
props.put("client.id", settings.applicationId);
kafkaProducer = new Producer<String, Message>(new ProducerConfig(props));
}
public void publish(Event event) {
try {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder = encoderFactory.binaryEncoder(stream, null);
avroEventWriter.write(event, binaryEncoder);
binaryEncoder.flush();
IOUtils.closeQuietly(stream);
Message m = new Message(stream.toByteArray());
producer.send(new ProducerData<String, Message>("my-topic", "my-partition-key", Lists.newArrayList(m)));
} catch (IOException e) {
throw new RuntimeException("Avro serialization failure", e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment