Skip to content

Instantly share code, notes, and snippets.

@gc-garcol
Last active July 3, 2024 14:11
Show Gist options
  • Save gc-garcol/d199a1cf3d1f243d874ab521c07d6c3b to your computer and use it in GitHub Desktop.
Save gc-garcol/d199a1cf3d1f243d874ab521c07d6c3b to your computer and use it in GitHub Desktop.
java-off-heap-database

java-off-heap-database

Java NIO ByteBuffer class

  • Cấp phát bộ nhớ cho ByteBuffer sử dụng method là allocateDirect (thay vì allocate), khi xài method này thì Java sẽ gọi thẳng tới lệnh cấp phát bộ nhớ từ OS, và bộ nhớ được cấp phát sẽ không nằm trong sự quản lí của Java Heap Space cũng như Java GC.
Person person = new Person();
//…
ByteBuffer bb = ByteBuffer.allocateDirect(size);
bb.putInt(person.age);
bb.putFloat(person.weeklySalary);
bb.put(person.lastName.getBytes());
bb.put(person.firstName.getBytes());
bb.put((byte)(person.fullTime == true ? 1 : 0 ));
  • Với mỗi lần write, con trỏ tự động được cập nhật lại ở vị trí tiếp theo
Person person = new Person();
Person.age = bb.getInt();
person.weeklySalary = bb.getFloat();
  • Để read một danh sách các dữ liệu, ta cần phải biết (và set) vị trí con trỏ nơi bắt đầu. Ví dụ, ở trên bạn đã write dữ liệu vào ByteBuffer từ vị trí 0, thì khi muốn đọc, ta phải quay về vị trí 0 bằng cách:
bb.position(0); // Set buffer position lại vị trí 0

Persistence data với MappedByteBuffer

“mapping” một vùng dữ liệu trên bộ nhớ lên file (disk).

Việc mapping này giúp khi bạn read/write data vào MappedByteBuffer, dữ liệu sẽ đồng thời ở trên bộ nhớ (virtual memory) và trên file (disk), việc sync dữ liệu sẽ được handle bởi OS (mmap và msync syscall) một cách hiệu quả.

FileChannel fc =
    FileChannel.open(FileSystems.getDefault().getPath(path), 
                     StandardOpenOption.WRITE,
                     StandardOpenOption.READ);
MappedByteBuffer mbb = 
    fc.map(FileChannel.MapMode.READ_WRITE, 
           0,          // vị trí
           fc.size()); // kích cỡ

Implement NoHeap key-value store

Key-Value Store này sẽ có 2 phần:

  • 1 phần là Journal Store (Data Store) để lưu dữ liệu
  • 1 phần là Index Store (FixedHash) để lưu các index

Đầu tiên, tạo 1 persistence store bằng RandomAccessFile, FileChannel và MappedByteBuffer

protected final boolean createMessageJournalMBB(String journalPath) {
    try {
        // Tạo thư mục và file
        File filePath = new File(journalFolder);
        filePath.mkdir();
        File file = new File(journalPath);
        fileExists = file.exists();

        journal = new RandomAccessFile(journalPath, "rw");
        if (fileExists)  {
            // File đã tồn tài, tiếp tục xài file cũ
            bufferSize = (int)journal.length();
        } else {
            // File mới, set size
            journal.setLength(bufferSize);
        }

        channel = journal.getChannel();
        buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, bufferSize);

        if (!fileExists) {
            // File Journal mới, tạo header file
            writeJournalHeader(journal);
            currentEnd = journal.getFilePointer();
        } else {
            // Lặp qua các vị trí hiện tại để tìm điểm kết thúc
            currentEnd = scanJournal();
        }
    }
    catch (Exception e) {
        logger.log(Level.SEVERE, "createMessageJournalMBB Exception: ", e);
    }

    return false;
}

Cấu trúc Journal Store

  • Các record trong Journal Store sẽ được lưu một cách liên tiếp trên array bytes buffer.
  • Cùng với data, mỗi record sẽ lưu thêm phần record size (đơn vị bytes), type của record và một flag isActive để đánh dấu là record này đã bị xóa hay chưa
  • Các record có thể được traversal một cách liên tiếp. Khi muốn xóa một record, đơn giản ta chỉ đánh dấu lên record đó là isActive = false
  • Các record mới được insert có thể được:
    • insert vào cuối array bytes buffer.
    • hoặc insert đè lên record có isActive = false và có size lớn hơn hoặc bằng record mới

insert new by replace old record

records structure

Store a record

  • tìm 1 free (inactive) slot để insert record mới, nếu không tìm thấy thì insert vào vị trí cuối cùng trong array bytes buffer.
  • Sau khi tìm được vị trí, việc lưu data của record sẽ được thực hiện như đã đề cập khi nãy (lưu thêm record header chứa size, type và flag isActive)
buffer.put(ACTIVE_RECORD); // 1 byte
buffer.put(type);          // 1 byte
buffer.putInt(datalen);    // 4 bytes

switch (type) {
    case LONG_RECORD_TYPE:
        buffer.putLong( (Long)val );
        break;
    case INT_RECORD_TYPE:
        buffer.putInt( (Integer)val );
        break;
    case DOUBLE_RECORD_TYPE:
        buffer.putDouble( (Double)val );
        break;
    case FLOAT_RECORD_TYPE:
        buffer.putFloat( (Float)val );
        break;
    case SHORT_RECORD_TYPE:
        buffer.putShort( (Short)val );
        break;
    case CHAR_RECORD_TYPE:
        buffer.putChar( (char)val );
        break;
    case TEXT_RECORD_TYPE:
        buffer.put( ((String)val).getBytes() );
        break;
    case BYTEARRAY_RECORD_TYPE:
        buffer.put( (byte[])val );
        break;
}

Việc lưu record vào Store đã xong, tiếp theo bước cuối cùng là index record

  • key: hashCode() để lấy hashCode, sau đó có thể tìm ra được vị trí data bucket cần truy xuất ở bên Journal Store.
  • ban đầu ta sẽ allocate memory cho Index Store bằng 1/4 của Journal Store.
  • Ta sẽ insert hashCode của key trong record mới và vị trí của record đó trong DataStore vào IndexStore.
offset = getHashBucket(key.hashCode() );
indexBuffer.position(offset);
indexBuffer.mark();
byte occupied = indexBuffer.get();
if ( occupied == 0 ) {
    // Đã tìm thấy slot phù hợp, reset position
    indexBuffer.reset();
}
else {
    // Dính collision rồi :(
    
    collisions++;

    offset = findBucket(key, offset, false);

    // Chuyển tới vị trí khác
    indexBuffer.position(offset);
}

// Write data
//
indexBuffer.put((byte)key.length() );
indexBuffer.putInt(key.hashCode()); 
if ( KEY_SIZE > 0 ) {
    byte[] fixedKeyBytes = new byte[KEY_SIZE];
    System.arraycopy(key.getBytes(), 
                    0, fixedKeyBytes, 
                    0, key.length());
    indexBuffer.put( fixedKeyBytes );
}
indexBuffer.putLong( value ); // indexed record location
  • findBucket:
while ( occupied > 0 && ! found) {
    int keyHash = indexBuffer.getInt();
    if ( keyHash == key.hashCode() ) {
        if ( KEY_SIZE > 0 ) {
            indexBuffer.position(
                    offset + 1 + Integer.BYTES + KEY_SIZE );
        }
        found = true;
        break;
    }
    else {
        offset += INDEX_ENTRY_SIZE_BYTES;
        if ( offset >= (sizeInBytes - INDEX_ENTRY_SIZE_BYTES)) {
            offset = INDEX_ENTRY_SIZE_BYTES;
        }

        indexBuffer.position(offset);
        occupied = indexBuffer.get();
    }
}

Get data từ store

// Lấy position ra từ IndexStore
Long offset = index.get(key);
//...

// Nhảy tới vị trí đó
buffer.position(offset.intValue());

// Đầu tiên đọc header
byte active = buffer.get();
if (active != 1) {
    return null;
}

byte type = buffer.get();

int dataLength = buffer.getInt();

// Sau đó đọc data
byte[] bytes;
switch ( type ) {
    case LONG_RECORD_TYPE:
        val = buffer.getLong();
        break;
    case INT_RECORD_TYPE:
        val = buffer.getInt();
        break;
    case DOUBLE_RECORD_TYPE:
        val = buffer.getDouble();
        break;
    case FLOAT_RECORD_TYPE:
        val = buffer.getFloat();
        break;
    case SHORT_RECORD_TYPE:
        val = buffer.getShort();
        break;
    case CHAR_RECORD_TYPE:
        val = buffer.getChar();
        break;
    case BYTEARRAY_RECORD_TYPE:
        bytes = new byte[dataLength];
        buffer.get(bytes);
        val = bytes;
        break;
    case TEXT_RECORD_TYPE:
        bytes = new byte[dataLength];
        buffer.get(bytes);
        val = new String(bytes);
        break;
}

Get data from indexStore

public Long get(String key) {
    int bucketOffset = getHashBucket( key.hashCode() );
    indexBuffer.position(bucketOffset);
    byte occupied = indexBuffer.get();
    if ( occupied > 0 ) {
        bucketOffset = findBucket(key, offset, true);
    }

    if ( bucketOffset == -1 ) {
        // Tìm không ra
        return -1L;
    }

    // Trả về position của record trong DataStore
    return indexBuffer.getLong();
}

Delete records

// Tìm record
offset = getRecordOffset(key);
if (offset == -1) {
    return false;
}

// Đọc header để lấy ra record size, sau đó đánh dấu nó là inactive
buffer.position(offset.intValue()); 
buffer.put(INACTIVE_RECORD);
buffer.put(EMPTY_RECORD_TYPE);
datalength = buffer.getInt();

// Lưu vị trí record bị xóa này để tái sử dụng sau
storeEmptyRecord(offset, datalength);

// Xóa record đó ra khỏi IndexStore
index.remove( key );

Expand DataStore khi sắp xài hết

ByteBuffer newBuffer = ByteBuffer.allocateDirect((int)newLength);
if ( buffer.hasArray() ) {
    byte[] array = buffer.array();
    newBuffer.put( array );
}
else {
    buffer.position(0);
    newBuffer.put(buffer);
}
buffer = newBuffer;
journalLen = buffer.capacity();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment