Skip to content

Instantly share code, notes, and snippets.

@yaroncon
Last active June 12, 2020 08:03
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save yaroncon/97082e0b794d83fccf12 to your computer and use it in GitHub Desktop.
Save yaroncon/97082e0b794d83fccf12 to your computer and use it in GitHub Desktop.
Kafka consumer with Avro
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.util.Utf8;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Controller;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumer {
public void consume()
{
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:8080");
props.put("group.id", "group1");
// to read messages from the beginning
// first time for every group
props.put("auto.offset.reset", "smallest");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
try {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("topic1", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this.topic);
if (streams.size() > 0)
{
readMessages(streams.get(0));
}
} catch (Throwable e) {
e.printStackTrace();
}
}
private void readMessages(KafkaStream<byte[], byte[]> stream) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
String firstName = null;
try {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
// LOG.info("raeding message: " + messageAndMetadata.offset());
GenericRecord genericRecord = byteArrayToDatum(getSchema(), messageAndMetadata.message());
firstName = getValue(genericRecord, "firstName", String.class);
// LOG.info("reading record: " + server + ", " + timestamp);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static GenericRecord byteArrayToDatum(Schema schema, byte[] byteData) {
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
ByteArrayInputStream byteArrayInputStream = null;
try {
byteArrayInputStream = new ByteArrayInputStream(byteData);
Decoder decoder = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null);
return reader.read(null, decoder);
} catch (IOException e) {
return null;
} finally {
try {
byteArrayInputStream.close();
} catch (IOException e) {
}
}
}
public static Schema getSchema()
{
String schemaStr = "{\"namespace\": \"org.test.data\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"GSD\",\n" +
" \"fields\": [\n" +
" {\"name\": \"firstName\", \"type\": \"string\"},\n" +
" ]\n" +
"}";
return new Schema.Parser().parse(schemaStr);
}
public static <T> T getValue(GenericRecord genericRecord, String name, Class<T> clazz)
{
Object obj = genericRecord.get(name);
if (obj == null)
return null;
if (obj.getClass() == Utf8.class)
{
return (T) obj.toString();
}
if (obj.getClass() == Integer.class)
{
return (T) obj;
}
return null;
}
}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.7</version>
</dependency>
@sparkleGoat
Copy link

When I attempted to run this over the data with respective schema,it returns an error of "AvroRuntimeException: Malformed data. Length is negative: -40" . Have you ever run into an error like this? It works up until the "Decoder decoder = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null); " . I have tried other decoders and printed out the contents of byteArrayInputStream variable which looks how you'd expect serialized (in the message I can see the schema and some data and some malformed data) I have the Bytes available which says 594. I am having trouble understanding why this error is continuously happening. I would appreciate any help if you can. Thank you for posting the code, it has been very helpful thus far.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment