- Cấp phát bộ nhớ cho ByteBuffer sử dụng method là allocateDirect (thay vì allocate), khi xài method này thì Java sẽ gọi thẳng tới lệnh cấp phát bộ nhớ từ OS, và bộ nhớ được cấp phát sẽ không nằm trong sự quản lí của Java Heap Space cũng như Java GC.
Person person = new Person();
//…
ByteBuffer bb = ByteBuffer.allocateDirect(size);
bb.putInt(person.age);
bb.putFloat(person.weeklySalary);
bb.put(person.lastName.getBytes());
bb.put(person.firstName.getBytes());
bb.put((byte)(person.fullTime == true ? 1 : 0 ));
- Với mỗi lần write, con trỏ tự động được cập nhật lại ở vị trí tiếp theo
Person person = new Person();
Person.age = bb.getInt();
person.weeklySalary = bb.getFloat();
- Để read một danh sách các dữ liệu, ta cần phải biết (và set) vị trí con trỏ nơi bắt đầu. Ví dụ, ở trên bạn đã write dữ liệu vào ByteBuffer từ vị trí 0, thì khi muốn đọc, ta phải quay về vị trí 0 bằng cách:
bb.position(0); // Set buffer position lại vị trí 0
“mapping” một vùng dữ liệu trên bộ nhớ lên file (disk).
Việc mapping này giúp khi bạn read/write data vào MappedByteBuffer, dữ liệu sẽ đồng thời ở trên bộ nhớ (virtual memory) và trên file (disk), việc sync dữ liệu sẽ được handle bởi OS (mmap và msync syscall) một cách hiệu quả.
FileChannel fc =
FileChannel.open(FileSystems.getDefault().getPath(path),
StandardOpenOption.WRITE,
StandardOpenOption.READ);
MappedByteBuffer mbb =
fc.map(FileChannel.MapMode.READ_WRITE,
0, // vị trí
fc.size()); // kích cỡ
Key-Value Store này sẽ có 2 phần:
- 1 phần là Journal Store (Data Store) để lưu dữ liệu
- 1 phần là Index Store (FixedHash) để lưu các index
protected final boolean createMessageJournalMBB(String journalPath) {
try {
// Tạo thư mục và file
File filePath = new File(journalFolder);
filePath.mkdir();
File file = new File(journalPath);
fileExists = file.exists();
journal = new RandomAccessFile(journalPath, "rw");
if (fileExists) {
// File đã tồn tài, tiếp tục xài file cũ
bufferSize = (int)journal.length();
} else {
// File mới, set size
journal.setLength(bufferSize);
}
channel = journal.getChannel();
buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, bufferSize);
if (!fileExists) {
// File Journal mới, tạo header file
writeJournalHeader(journal);
currentEnd = journal.getFilePointer();
} else {
// Lặp qua các vị trí hiện tại để tìm điểm kết thúc
currentEnd = scanJournal();
}
}
catch (Exception e) {
logger.log(Level.SEVERE, "createMessageJournalMBB Exception: ", e);
}
return false;
}
- Các record trong Journal Store sẽ được lưu một cách liên tiếp trên array bytes buffer.
- Cùng với data, mỗi record sẽ lưu thêm phần record size (đơn vị bytes), type của record và một flag isActive để đánh dấu là record này đã bị xóa hay chưa
- Các record có thể được traversal một cách liên tiếp. Khi muốn xóa một record, đơn giản ta chỉ đánh dấu lên record đó là isActive = false
- Các record mới được insert có thể được:
- insert vào cuối array bytes buffer.
- hoặc insert đè lên record có isActive = false và có size lớn hơn hoặc bằng record mới
- tìm 1 free (inactive) slot để insert record mới, nếu không tìm thấy thì insert vào vị trí cuối cùng trong array bytes buffer.
- Sau khi tìm được vị trí, việc lưu data của record sẽ được thực hiện như đã đề cập khi nãy (lưu thêm record header chứa size, type và flag isActive)
buffer.put(ACTIVE_RECORD); // 1 byte
buffer.put(type); // 1 byte
buffer.putInt(datalen); // 4 bytes
switch (type) {
case LONG_RECORD_TYPE:
buffer.putLong( (Long)val );
break;
case INT_RECORD_TYPE:
buffer.putInt( (Integer)val );
break;
case DOUBLE_RECORD_TYPE:
buffer.putDouble( (Double)val );
break;
case FLOAT_RECORD_TYPE:
buffer.putFloat( (Float)val );
break;
case SHORT_RECORD_TYPE:
buffer.putShort( (Short)val );
break;
case CHAR_RECORD_TYPE:
buffer.putChar( (char)val );
break;
case TEXT_RECORD_TYPE:
buffer.put( ((String)val).getBytes() );
break;
case BYTEARRAY_RECORD_TYPE:
buffer.put( (byte[])val );
break;
}
- key: hashCode() để lấy hashCode, sau đó có thể tìm ra được vị trí data bucket cần truy xuất ở bên Journal Store.
- ban đầu ta sẽ allocate memory cho Index Store bằng 1/4 của Journal Store.
- Ta sẽ insert hashCode của key trong record mới và vị trí của record đó trong DataStore vào IndexStore.
offset = getHashBucket(key.hashCode() );
indexBuffer.position(offset);
indexBuffer.mark();
byte occupied = indexBuffer.get();
if ( occupied == 0 ) {
// Đã tìm thấy slot phù hợp, reset position
indexBuffer.reset();
}
else {
// Dính collision rồi :(
collisions++;
offset = findBucket(key, offset, false);
// Chuyển tới vị trí khác
indexBuffer.position(offset);
}
// Write data
//
indexBuffer.put((byte)key.length() );
indexBuffer.putInt(key.hashCode());
if ( KEY_SIZE > 0 ) {
byte[] fixedKeyBytes = new byte[KEY_SIZE];
System.arraycopy(key.getBytes(),
0, fixedKeyBytes,
0, key.length());
indexBuffer.put( fixedKeyBytes );
}
indexBuffer.putLong( value ); // indexed record location
- findBucket:
while ( occupied > 0 && ! found) {
int keyHash = indexBuffer.getInt();
if ( keyHash == key.hashCode() ) {
if ( KEY_SIZE > 0 ) {
indexBuffer.position(
offset + 1 + Integer.BYTES + KEY_SIZE );
}
found = true;
break;
}
else {
offset += INDEX_ENTRY_SIZE_BYTES;
if ( offset >= (sizeInBytes - INDEX_ENTRY_SIZE_BYTES)) {
offset = INDEX_ENTRY_SIZE_BYTES;
}
indexBuffer.position(offset);
occupied = indexBuffer.get();
}
}
// Lấy position ra từ IndexStore
Long offset = index.get(key);
//...
// Nhảy tới vị trí đó
buffer.position(offset.intValue());
// Đầu tiên đọc header
byte active = buffer.get();
if (active != 1) {
return null;
}
byte type = buffer.get();
int dataLength = buffer.getInt();
// Sau đó đọc data
byte[] bytes;
switch ( type ) {
case LONG_RECORD_TYPE:
val = buffer.getLong();
break;
case INT_RECORD_TYPE:
val = buffer.getInt();
break;
case DOUBLE_RECORD_TYPE:
val = buffer.getDouble();
break;
case FLOAT_RECORD_TYPE:
val = buffer.getFloat();
break;
case SHORT_RECORD_TYPE:
val = buffer.getShort();
break;
case CHAR_RECORD_TYPE:
val = buffer.getChar();
break;
case BYTEARRAY_RECORD_TYPE:
bytes = new byte[dataLength];
buffer.get(bytes);
val = bytes;
break;
case TEXT_RECORD_TYPE:
bytes = new byte[dataLength];
buffer.get(bytes);
val = new String(bytes);
break;
}
Get data from indexStore
public Long get(String key) {
int bucketOffset = getHashBucket( key.hashCode() );
indexBuffer.position(bucketOffset);
byte occupied = indexBuffer.get();
if ( occupied > 0 ) {
bucketOffset = findBucket(key, offset, true);
}
if ( bucketOffset == -1 ) {
// Tìm không ra
return -1L;
}
// Trả về position của record trong DataStore
return indexBuffer.getLong();
}
// Tìm record
offset = getRecordOffset(key);
if (offset == -1) {
return false;
}
// Đọc header để lấy ra record size, sau đó đánh dấu nó là inactive
buffer.position(offset.intValue());
buffer.put(INACTIVE_RECORD);
buffer.put(EMPTY_RECORD_TYPE);
datalength = buffer.getInt();
// Lưu vị trí record bị xóa này để tái sử dụng sau
storeEmptyRecord(offset, datalength);
// Xóa record đó ra khỏi IndexStore
index.remove( key );
ByteBuffer newBuffer = ByteBuffer.allocateDirect((int)newLength);
if ( buffer.hasArray() ) {
byte[] array = buffer.array();
newBuffer.put( array );
}
else {
buffer.position(0);
newBuffer.put(buffer);
}
buffer = newBuffer;
journalLen = buffer.capacity();