-
-
Save TechGeeky/aff1fc6e6af269fb19ee59d3d2df5e2a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public final class Message { | |
private final byte dataCenter; | |
private final byte recordVersion; | |
private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder; | |
private final long address; | |
private final long addressFrom; | |
private final long addressOrigin; | |
private final byte partition; | |
private final byte replicated; | |
public Message(Partition partition, Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) { | |
checkArgument(MapUtils.isNotEmpty(clientKeyBytesAndProcessBytesHolder), | |
"clientKeyBytesAndProcessBytesHolder cannot be null or empty, found '%s'.", clientKeyBytesAndProcessBytesHolder); | |
this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder; | |
this.partition = (byte) partition.getPartition(); | |
this.dataCenter = TestUtils.CURRENT_LOCATION.get().datacenter(); | |
this.recordVersion = 1; | |
this.replicated = 0; | |
long packedAddress = new AddressData().packAddress(); | |
this.address = packedAddress; | |
this.addressFrom = 0L; | |
this.addressOrigin = packedAddress; | |
} | |
public Message(byte[] packedArray) { | |
ByteBuffer bb = ByteBuffer.wrap(packedArray).order(ByteOrder.BIG_ENDIAN); | |
bb.rewind(); | |
this.dataCenter = bb.get(); | |
this.recordVersion = bb.get(); | |
int numOfRecords = bb.getInt(); | |
int bufferUsed = bb.getInt(); | |
this.address = bb.getLong(); | |
this.addressFrom = bb.getLong(); | |
this.addressOrigin = bb.getLong(); | |
this.partition = bb.get(); | |
this.replicated = bb.get(); | |
this.clientKeyBytesAndProcessBytesHolder = new HashMap<>(); | |
for (int i = 0; i < numOfRecords; i++) { | |
byte keyType = bb.get(); | |
byte keyLength = bb.get(); | |
byte[] extractKeyValue = new byte[keyLength]; | |
bb.get(extractKeyValue); | |
String key = new String(extractKeyValue, StandardCharsets.UTF_8); | |
long timestamp = bb.getLong(); | |
short dataLength = bb.getShort(); | |
byte[] extractDataValue = new byte[dataLength]; | |
bb.get(extractDataValue); | |
clientKeyBytesAndProcessBytesHolder.put(extractKeyValue, extractDataValue); | |
} | |
} | |
public byte[] serialize() { | |
int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder); | |
ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN); | |
// header layout | |
byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size()) | |
.putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin) | |
.put(partition).put(replicated); | |
// now the data layout | |
for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) { | |
byte keyType = 0; | |
byte[] key = entry.getKey(); | |
byte[] value = entry.getValue(); | |
byte keyLength = (byte) key.length; | |
short valueLength = (short) value.length; | |
ByteBuffer dataBuffer = ByteBuffer.wrap(value); | |
long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis(); | |
byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength) | |
.put(value); | |
} | |
return byteBuffer.array(); | |
} | |
/** | |
* Below method is used to get the actual Buffer used for the packed byte array. | |
* | |
* @param clientKeyBytesAndProcessBytesHolder | |
* @return | |
*/ | |
private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) { | |
int size = QueueUtils.HEADER_SIZE; | |
for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) { | |
size += 1 + 1 + 8 + 2; | |
size += entry.getKey().length; | |
size += entry.getValue().length; | |
} | |
return size; | |
} | |
// getters and toString method | |
} | |
================================================================================================= | |
public final class Message { | |
private final Partition partition; | |
private final String clientKey; | |
private final byte[] processBytes; | |
public Message(Partition partition, String clientKey, byte[] processBytes) { | |
super(); | |
this.partition = partition; | |
this.clientKey = clientKey; | |
this.processBytes = processBytes; | |
} | |
// getters and toString here | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment