Skip to content

Instantly share code, notes, and snippets.

@Fluxx
Last active August 29, 2015 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Fluxx/52950530322c09267c5d to your computer and use it in GitHub Desktop.
Save Fluxx/52950530322c09267c5d to your computer and use it in GitHub Desktop.
diff --git a/src/main/java/com/pinterest/secor/common/KafkaClient.java b/src/main/java/com/pinterest/secor/common/KafkaClient.java
index 3fe96a2..d7061dc 100644
--- a/src/main/java/com/pinterest/secor/common/KafkaClient.java
+++ b/src/main/java/com/pinterest/secor/common/KafkaClient.java
@@ -131,8 +131,11 @@ public class KafkaClient {
ByteBuffer payload = messageAndOffset.message().payload();
byte[] payloadBytes = new byte[payload.limit()];
payload.get(payloadBytes);
+ ByteBuffer key = messageAndOffset.message().key();
+ byte[] keyBytes = new byte[key.limit()];
+ key.get(keyBytes);
return new Message(topicPartition.getTopic(), topicPartition.getPartition(),
- messageAndOffset.offset(), payloadBytes);
+ messageAndOffset.offset(), keyBytes, payloadBytes);
}
private SimpleConsumer createConsumer(TopicPartition topicPartition) {
diff --git a/src/main/java/com/pinterest/secor/message/Message.java b/src/main/java/com/pinterest/secor/message/Message.java
index c50ba94..98681ca 100644
--- a/src/main/java/com/pinterest/secor/message/Message.java
+++ b/src/main/java/com/pinterest/secor/message/Message.java
@@ -29,12 +29,14 @@ public class Message {
private String mTopic;
private int mKafkaPartition;
private long mOffset;
+ private byte[] mKey;
private byte[] mPayload;
protected String fieldsToString() {
return "topic='" + mTopic + '\'' +
", kafkaPartition=" + mKafkaPartition +
", offset=" + mOffset +
+ ", key=" + mKey +
", payload=" + new String(mPayload);
}
@@ -44,10 +46,12 @@ public class Message {
return "Message{" + fieldsToString() + '}';
}
- public Message(String topic, int kafkaPartition, long offset, byte[] payload) {
+ public Message(String topic, int kafkaPartition, long offset, byte[] key,
+ byte[] payload) {
mTopic = topic;
mKafkaPartition = kafkaPartition;
mOffset = offset;
+ mKey = key;
mPayload = payload;
}
@@ -64,6 +68,10 @@ public class Message {
return mOffset;
}
+ public byte[] getKey() {
+ return mKey;
+ }
+
public byte[] getPayload() {
return mPayload;
}
diff --git a/src/main/java/com/pinterest/secor/reader/MessageReader.java b/src/main/java/com/pinterest/secor/reader/MessageReader.java
index 6fd0b38..f1158b8 100644
--- a/src/main/java/com/pinterest/secor/reader/MessageReader.java
+++ b/src/main/java/com/pinterest/secor/reader/MessageReader.java
@@ -123,7 +123,8 @@ public class MessageReader {
RateLimitUtil.acquire();
MessageAndMetadata<byte[], byte[]> kafkaMessage = mIterator.next();
Message message = new Message(kafkaMessage.topic(), kafkaMessage.partition(),
- kafkaMessage.offset(), kafkaMessage.message());
+ kafkaMessage.offset(), kafkaMessage.key(),
+ kafkaMessage.message());
TopicPartition topicPartition = new TopicPartition(message.getTopic(),
message.getKafkaPartition());
updateAccessTime(topicPartition);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment