Created
November 11, 2020 13:03
-
-
Save ennerf/d0c34860913d77b9f743be7276d091cf to your computer and use it in GitHub Desktop.
Memory mapped buffer pool for MAT File Library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package us.hebi.core.io.mat; | |
| import com.google.common.collect.Lists; | |
| import us.hebi.glue.Buffer6; | |
| import us.hebi.glue.DirectBuffer6; | |
| import us.hebi.glue.Unsafe9; | |
| import us.hebi.matlab.mat.format.BufferAllocator; | |
| import java.io.Closeable; | |
| import java.io.File; | |
| import java.io.IOException; | |
| import java.io.RandomAccessFile; | |
| import java.nio.ByteBuffer; | |
| import java.nio.ByteOrder; | |
| import java.nio.channels.FileChannel; | |
| import java.util.List; | |
| import java.util.concurrent.atomic.AtomicInteger; | |
| import static us.hebi.matlab.mat.util.Preconditions.*; | |
| /** | |
| * Allocates CACHE_LINE aligned direct or memory-mapped | |
| * buffers and reuses them if possible. | |
| * | |
| * @author Florian Enner | |
| * @since 31 Oct 2018 | |
| */ | |
| class MappedBufferPool implements BufferAllocator, Closeable { | |
| MappedBufferPool(File tmpFolder) { | |
| this.tmpFolder = checkNotNull(tmpFolder); | |
| } | |
| @Override | |
| public ByteBuffer allocate(int numBytes) { | |
| // Try to reuse an available one if possible | |
| ByteBuffer pooledBuffer = searchBuffer(numBytes); | |
| if (pooledBuffer != null) | |
| return pooledBuffer; | |
| // Sanity size check | |
| if (numBytes > MAX_ALLOCATION_SIZE) | |
| throw new IllegalArgumentException("Exceeds maximum buffer size"); | |
| // Otherwise allocate an aligned one | |
| final int capacity = numBytes + CACHE_LINE_SIZE + REUSE_PADDING; | |
| final ByteBuffer directBuffer = capacity <= MAP_THRESHOLD ? | |
| allocateDirect(capacity) : allocateMapped(capacity); | |
| // Align to the next cache line | |
| final long address = DirectBuffer6.getAddress(directBuffer); | |
| final int remainder = (int) (address & (CACHE_LINE_SIZE - 1)); | |
| // Align buffer such that position zero starts aligned. Note that | |
| // limit() may be <= capacity() as per interface contract, so we | |
| // can expose a higher capacity to increase reuse probability. | |
| ByteBuffer alignedBuffer = directBuffer; | |
| if (remainder > 0) { | |
| final int offset = CACHE_LINE_SIZE - remainder; | |
| Buffer6.setPosition(directBuffer, offset); | |
| alignedBuffer = directBuffer.slice(); | |
| } | |
| // Set appropriate limit | |
| alignedBuffer.order(ByteOrder.nativeOrder()); | |
| Buffer6.setLimit(alignedBuffer, numBytes); | |
| return alignedBuffer; | |
| } | |
| private ByteBuffer allocateDirect(int capacity) { | |
| final ByteBuffer buffer = ByteBuffer.allocateDirect(capacity + CACHE_LINE_SIZE); | |
| registerCloser(new Closeable() { | |
| @Override | |
| public void close() throws IOException { | |
| Unsafe9.invokeCleaner(buffer); | |
| } | |
| }); | |
| return buffer; | |
| } | |
| private ByteBuffer allocateMapped(int capacity) { | |
| try { | |
| createTempFolder(); | |
| final File tmpFile = new File(tmpFolder, fileCount.getAndIncrement() + ".tmp"); | |
| final FileChannel channel = new RandomAccessFile(tmpFile, "rw").getChannel(); | |
| final ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, capacity) | |
| .load(); | |
| registerCloser(new Closeable() { | |
| @Override | |
| public void close() throws IOException { | |
| Unsafe9.invokeCleaner(buffer); | |
| channel.close(); | |
| if (!tmpFile.delete()) { | |
| System.out.println("Could not delete temporary file: " + tmpFile); | |
| } | |
| } | |
| }); | |
| return buffer; | |
| } catch (IOException ioe) { | |
| throw new RuntimeException(ioe); | |
| } | |
| } | |
| private synchronized void createTempFolder() { | |
| if (fileCount.get() == 0) { | |
| // Try deleting any old directory | |
| if (tmpFolder.exists() && !tmpFolder.delete()) { | |
| throw new IllegalStateException("Failed to overwrite existing temporary folder: " + tmpFolder.getAbsolutePath()); | |
| } | |
| // Create new directory | |
| if (!tmpFolder.mkdirs()) { | |
| throw new IllegalStateException("Failed to create temporary folder: " + tmpFolder.getAbsolutePath()); | |
| } | |
| } | |
| } | |
| private synchronized void registerCloser(Closeable closeable) { | |
| closers.add(closeable); | |
| } | |
| private synchronized ByteBuffer searchBuffer(int capacity) { | |
| for (ByteBuffer buffer : pool) { | |
| if (buffer.capacity() >= capacity) { | |
| Buffer6.setPosition(buffer, 0); | |
| Buffer6.setLimit(buffer, capacity); | |
| pool.remove(buffer); | |
| return buffer; | |
| } | |
| } | |
| return null; | |
| } | |
| @Override | |
| public synchronized void release(ByteBuffer buffer) { | |
| pool.add(buffer); | |
| } | |
| @Override | |
| public synchronized void close() throws IOException { | |
| pool.clear(); | |
| for (Closeable closer : closers) { | |
| closer.close(); | |
| } | |
| closers.clear(); | |
| // Delete temporary folder | |
| if (fileCount.get() > 0 && tmpFolder.exists() && !tmpFolder.delete()) { | |
| System.err.println("Could not delete temporary folder. It may have been accessed by another application."); | |
| } | |
| } | |
| private final AtomicInteger fileCount = new AtomicInteger(0); | |
| List<ByteBuffer> pool = Lists.newArrayList(); | |
| List<Closeable> closers = Lists.newArrayList(); | |
| private final File tmpFolder; | |
| private static final int MAP_THRESHOLD = 2 * 1024 * 1024; // 2 MB | |
| private static final int CACHE_LINE_SIZE = 64; // single cache line | |
| private static final int REUSE_PADDING = 64; // 2nd step writes data + array header (name, dim, type, etc.) | |
| private static final long MAX_ALLOCATION_SIZE = Integer.MAX_VALUE - CACHE_LINE_SIZE - REUSE_PADDING; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment