Last active June 12, 2020 08:03
Kafka consumer with Avro
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Controller;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumer {
public void consume()
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:8080");
props.put("", "group1");
// to read messages from the beginning
// first time for every group
props.put("auto.offset.reset", "smallest");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
try {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("topic1", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this.topic);
if (streams.size() > 0)
} catch (Throwable e) {
private void readMessages(KafkaStream<byte[], byte[]> stream) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
String firstName = null;
try {
MessageAndMetadata<byte[], byte[]> messageAndMetadata =;
//"raeding message: " + messageAndMetadata.offset());
GenericRecord genericRecord = byteArrayToDatum(getSchema(), messageAndMetadata.message());
firstName = getValue(genericRecord, "firstName", String.class);
//"reading record: " + server + ", " + timestamp);
} catch (Exception e) {
public static GenericRecord byteArrayToDatum(Schema schema, byte[] byteData) {
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
ByteArrayInputStream byteArrayInputStream = null;
try {
byteArrayInputStream = new ByteArrayInputStream(byteData);
Decoder decoder = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null);
return, decoder);
} catch (IOException e) {
return null;
} finally {
try {
} catch (IOException e) {
public static Schema getSchema()
String schemaStr = "{\"namespace\": \"\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"GSD\",\n" +
" \"fields\": [\n" +
" {\"name\": \"firstName\", \"type\": \"string\"},\n" +
" ]\n" +
return new Schema.Parser().parse(schemaStr);
public static <T> T getValue(GenericRecord genericRecord, String name, Class<T> clazz)
Object obj = genericRecord.get(name);
if (obj == null)
return null;
if (obj.getClass() == Utf8.class)
return (T) obj.toString();
if (obj.getClass() == Integer.class)
return (T) obj;
return null;
When I attempted to run this over the data with respective schema,it returns an error of "AvroRuntimeException: Malformed data. Length is negative: -40" . Have you ever run into an error like this? It works up until the "Decoder decoder = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null); " . I have tried other decoders and printed out the contents of byteArrayInputStream variable which looks how you'd expect serialized (in the message I can see the schema and some data and some malformed data) I have the Bytes available which says 594. I am having trouble understanding why this error is continuously happening. I would appreciate any help if you can. Thank you for posting the code, it has been very helpful thus far.

