Skip to content

Instantly share code, notes, and snippets.

@dmlloyd
Last active August 29, 2015 14:15
Show Gist options
  • Save dmlloyd/238895a28abda37d8fdd to your computer and use it in GitHub Desktop.
Save dmlloyd/238895a28abda37d8fdd to your computer and use it in GitHub Desktop.
Simplified buffer pool impl
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2015 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.examples;
import static java.lang.Math.min;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
*/
public interface BufferPool2 {
ByteBuffer allocate();
void free(ByteBuffer buffer);
boolean isDirect();
int getSize();
default BufferPool2 zeroing() {
return new BufferPool2() {
public ByteBuffer allocate() {
return BufferPool2.this.allocate();
}
public void free(final ByteBuffer buffer) {
Buffers.zero(buffer);
BufferPool2.this.free(buffer);
}
public boolean isDirect() {
return BufferPool2.this.isDirect();
}
public int getSize() {
return BufferPool2.this.getSize();
}
public BufferPool2 zeroing() {
return this;
}
};
}
static BufferPool2 create(int size, boolean direct) {
final int finalSize = min(Integer.highestOneBit(size - 1), 0x40000000) << 1;
final int masterSize = min(0x100000, size);
final int slices = size < 0x10000 ? masterSize / size : 1;
return new BufferPool2() {
private final ConcurrentLinkedQueue<ByteBuffer> masterQueue = new ConcurrentLinkedQueue<>();
private final ThreadLocal<LocalCache> bufferQueue = new ThreadLocal<LocalCache>() {
protected LocalCache initialValue() {
return new LocalCache();
}
public void remove() {
get().empty();
}
};
public ByteBuffer allocate() {
final LocalCache localCache = bufferQueue.get();
ByteBuffer byteBuffer = localCache.queue.pollLast();
if (byteBuffer == null) {
ConcurrentLinkedQueue<ByteBuffer> masterQueue = this.masterQueue;
byteBuffer = masterQueue.poll();
if (byteBuffer == null) {
ByteBuffer masterBuffer = direct ? ByteBuffer.allocate(masterSize) : ByteBuffer.allocateDirect(masterSize);
if (slices == 1) {
byteBuffer = masterBuffer;
} else {
for (int i = 0; i < slices; i ++) {
masterBuffer.limit(masterBuffer.position() + finalSize);
if (i < slices - 1) {
fastFree(masterBuffer.slice(), localCache);
masterBuffer.position(masterBuffer.position() + finalSize);
}
}
assert masterBuffer.position() + finalSize == masterBuffer.capacity();
masterBuffer.limit(masterBuffer.capacity());
byteBuffer = masterBuffer.slice();
}
} else {
localCache.outstanding ++;
}
}
return byteBuffer;
}
public void free(final ByteBuffer buffer) {
if (isDirect() != buffer.isDirect() || getSize() != buffer.capacity()) {
throw new IllegalArgumentException("Wrong buffer returned to pool");
}
fastFree(buffer, bufferQueue.get());
}
private void fastFree(final ByteBuffer buffer, final LocalCache localCache) {
buffer.clear();
if (localCache.outstanding -- >= 16 || localCache.queue.size() == 16) {
masterQueue.add(buffer);
} else {
localCache.queue.add(buffer);
}
}
public boolean isDirect() {
return direct;
}
public int getSize() {
return finalSize;
}
class LocalCache {
final ArrayDeque<ByteBuffer> queue = new ArrayDeque<>();
int outstanding;
protected void finalize() throws Throwable {
empty();
}
void empty() {
while (! queue.isEmpty()) {
masterQueue.add(queue.poll());
}
}
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment