Skip to content

Instantly share code, notes, and snippets.

@yaroncon
Last active November 19, 2020 06:12
Show Gist options
  • Save yaroncon/6ce06005429b2c4d6aa8 to your computer and use it in GitHub Desktop.
Save yaroncon/6ce06005429b2c4d6aa8 to your computer and use it in GitHub Desktop.
Kafka producer, with Kafka-Client and Avro
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Controller;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Properties;
public class KafkaProducer{
public void produce()
{
Integer Partition = 1;
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:8080");
properties.put("acks","all");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put("block.on.buffer.full","false");
String schemaStr = "{\"namespace\": \"org.test.data\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"user\",\n" +
" \"fields\": [\n" +
" {\"name\": \"firstName\", \"type\": \"string\"},\n" +
" ]\n" +
"}";
Schema schema = new Schema.Parser().parse(schemaStr);
GenericRecord datum = new GenericData.Record(schema);
datum.put("firstName", "yaron");
KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(properties);
ProducerRecord<String, byte[]> producerRecord = null;
try {
producerRecord = new ProducerRecord<String, byte[]>("topic",
Partition,
"key",
datumToByteArray(schema, datum));
} catch (IOException e) {
e.printStackTrace();
}
messageProducer.send(producerRecord);
}
public static byte[] datumToByteArray(Schema schema, GenericRecord datum) throws IOException {
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
ByteArrayOutputStream os = new ByteArrayOutputStream();
try {
Encoder e = EncoderFactory.get().binaryEncoder(os, null);
writer.write(datum, e);
e.flush();
byte[] byteData = os.toByteArray();
return byteData;
} finally {
os.close();
}
}
}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.7</version>
</dependency>
Copy link

ghost commented Aug 26, 2015

Good example, thanks. A much simpler design than others I've seen.
Though Javadocs state: "Failure to close the producer after use will leak these resources."
I added messageProducer.close() after line 52

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment