Skip to content

Instantly share code, notes, and snippets.

@ennerf
Created November 11, 2020 13:03
Show Gist options
  • Select an option

  • Save ennerf/d0c34860913d77b9f743be7276d091cf to your computer and use it in GitHub Desktop.

Select an option

Save ennerf/d0c34860913d77b9f743be7276d091cf to your computer and use it in GitHub Desktop.
Memory mapped buffer pool for MAT File Library
package us.hebi.core.io.mat;
import com.google.common.collect.Lists;
import us.hebi.glue.Buffer6;
import us.hebi.glue.DirectBuffer6;
import us.hebi.glue.Unsafe9;
import us.hebi.matlab.mat.format.BufferAllocator;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static us.hebi.matlab.mat.util.Preconditions.*;
/**
* Allocates CACHE_LINE aligned direct or memory-mapped
* buffers and reuses them if possible.
*
* @author Florian Enner
* @since 31 Oct 2018
*/
class MappedBufferPool implements BufferAllocator, Closeable {
MappedBufferPool(File tmpFolder) {
this.tmpFolder = checkNotNull(tmpFolder);
}
@Override
public ByteBuffer allocate(int numBytes) {
// Try to reuse an available one if possible
ByteBuffer pooledBuffer = searchBuffer(numBytes);
if (pooledBuffer != null)
return pooledBuffer;
// Sanity size check
if (numBytes > MAX_ALLOCATION_SIZE)
throw new IllegalArgumentException("Exceeds maximum buffer size");
// Otherwise allocate an aligned one
final int capacity = numBytes + CACHE_LINE_SIZE + REUSE_PADDING;
final ByteBuffer directBuffer = capacity <= MAP_THRESHOLD ?
allocateDirect(capacity) : allocateMapped(capacity);
// Align to the next cache line
final long address = DirectBuffer6.getAddress(directBuffer);
final int remainder = (int) (address & (CACHE_LINE_SIZE - 1));
// Align buffer such that position zero starts aligned. Note that
// limit() may be <= capacity() as per interface contract, so we
// can expose a higher capacity to increase reuse probability.
ByteBuffer alignedBuffer = directBuffer;
if (remainder > 0) {
final int offset = CACHE_LINE_SIZE - remainder;
Buffer6.setPosition(directBuffer, offset);
alignedBuffer = directBuffer.slice();
}
// Set appropriate limit
alignedBuffer.order(ByteOrder.nativeOrder());
Buffer6.setLimit(alignedBuffer, numBytes);
return alignedBuffer;
}
private ByteBuffer allocateDirect(int capacity) {
final ByteBuffer buffer = ByteBuffer.allocateDirect(capacity + CACHE_LINE_SIZE);
registerCloser(new Closeable() {
@Override
public void close() throws IOException {
Unsafe9.invokeCleaner(buffer);
}
});
return buffer;
}
private ByteBuffer allocateMapped(int capacity) {
try {
createTempFolder();
final File tmpFile = new File(tmpFolder, fileCount.getAndIncrement() + ".tmp");
final FileChannel channel = new RandomAccessFile(tmpFile, "rw").getChannel();
final ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, capacity)
.load();
registerCloser(new Closeable() {
@Override
public void close() throws IOException {
Unsafe9.invokeCleaner(buffer);
channel.close();
if (!tmpFile.delete()) {
System.out.println("Could not delete temporary file: " + tmpFile);
}
}
});
return buffer;
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
private synchronized void createTempFolder() {
if (fileCount.get() == 0) {
// Try deleting any old directory
if (tmpFolder.exists() && !tmpFolder.delete()) {
throw new IllegalStateException("Failed to overwrite existing temporary folder: " + tmpFolder.getAbsolutePath());
}
// Create new directory
if (!tmpFolder.mkdirs()) {
throw new IllegalStateException("Failed to create temporary folder: " + tmpFolder.getAbsolutePath());
}
}
}
private synchronized void registerCloser(Closeable closeable) {
closers.add(closeable);
}
private synchronized ByteBuffer searchBuffer(int capacity) {
for (ByteBuffer buffer : pool) {
if (buffer.capacity() >= capacity) {
Buffer6.setPosition(buffer, 0);
Buffer6.setLimit(buffer, capacity);
pool.remove(buffer);
return buffer;
}
}
return null;
}
@Override
public synchronized void release(ByteBuffer buffer) {
pool.add(buffer);
}
@Override
public synchronized void close() throws IOException {
pool.clear();
for (Closeable closer : closers) {
closer.close();
}
closers.clear();
// Delete temporary folder
if (fileCount.get() > 0 && tmpFolder.exists() && !tmpFolder.delete()) {
System.err.println("Could not delete temporary folder. It may have been accessed by another application.");
}
}
private final AtomicInteger fileCount = new AtomicInteger(0);
List<ByteBuffer> pool = Lists.newArrayList();
List<Closeable> closers = Lists.newArrayList();
private final File tmpFolder;
private static final int MAP_THRESHOLD = 2 * 1024 * 1024; // 2 MB
private static final int CACHE_LINE_SIZE = 64; // single cache line
private static final int REUSE_PADDING = 64; // 2nd step writes data + array header (name, dim, type, etc.)
private static final long MAX_ALLOCATION_SIZE = Integer.MAX_VALUE - CACHE_LINE_SIZE - REUSE_PADDING;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment