Skip to content

Instantly share code, notes, and snippets.

@TechGeeky
Last active February 16, 2017 04:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TechGeeky/aff1fc6e6af269fb19ee59d3d2df5e2a to your computer and use it in GitHub Desktop.
Save TechGeeky/aff1fc6e6af269fb19ee59d3d2df5e2a to your computer and use it in GitHub Desktop.
public final class Message {
private final byte dataCenter;
private final byte recordVersion;
private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte partition;
private final byte replicated;
public Message(Partition partition, Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
checkArgument(MapUtils.isNotEmpty(clientKeyBytesAndProcessBytesHolder),
"clientKeyBytesAndProcessBytesHolder cannot be null or empty, found '%s'.", clientKeyBytesAndProcessBytesHolder);
this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder;
this.partition = (byte) partition.getPartition();
this.dataCenter = TestUtils.CURRENT_LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
long packedAddress = new AddressData().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
public Message(byte[] packedArray) {
ByteBuffer bb = ByteBuffer.wrap(packedArray).order(ByteOrder.BIG_ENDIAN);
bb.rewind();
this.dataCenter = bb.get();
this.recordVersion = bb.get();
int numOfRecords = bb.getInt();
int bufferUsed = bb.getInt();
this.address = bb.getLong();
this.addressFrom = bb.getLong();
this.addressOrigin = bb.getLong();
this.partition = bb.get();
this.replicated = bb.get();
this.clientKeyBytesAndProcessBytesHolder = new HashMap<>();
for (int i = 0; i < numOfRecords; i++) {
byte keyType = bb.get();
byte keyLength = bb.get();
byte[] extractKeyValue = new byte[keyLength];
bb.get(extractKeyValue);
String key = new String(extractKeyValue, StandardCharsets.UTF_8);
long timestamp = bb.getLong();
short dataLength = bb.getShort();
byte[] extractDataValue = new byte[dataLength];
bb.get(extractDataValue);
clientKeyBytesAndProcessBytesHolder.put(extractKeyValue, extractDataValue);
}
}
public byte[] serialize() {
int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder);
ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN);
// header layout
byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size())
.putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin)
.put(partition).put(replicated);
// now the data layout
for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
byte keyType = 0;
byte[] key = entry.getKey();
byte[] value = entry.getValue();
byte keyLength = (byte) key.length;
short valueLength = (short) value.length;
ByteBuffer dataBuffer = ByteBuffer.wrap(value);
long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength)
.put(value);
}
return byteBuffer.array();
}
/**
* Below method is used to get the actual Buffer used for the packed byte array.
*
* @param clientKeyBytesAndProcessBytesHolder
* @return
*/
private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
int size = QueueUtils.HEADER_SIZE;
for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
size += 1 + 1 + 8 + 2;
size += entry.getKey().length;
size += entry.getValue().length;
}
return size;
}
// getters and toString method
}
=================================================================================================
public final class Message {
private final Partition partition;
private final String clientKey;
private final byte[] processBytes;
public Message(Partition partition, String clientKey, byte[] processBytes) {
super();
this.partition = partition;
this.clientKey = clientKey;
this.processBytes = processBytes;
}
// getters and toString here
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment