Skip to content

Instantly share code, notes, and snippets.

@dmlloyd
Last active August 29, 2015 14:15
Show Gist options
  • Save dmlloyd/924e7fbf16b318a19e6b to your computer and use it in GitHub Desktop.
Save dmlloyd/924e7fbf16b318a19e6b to your computer and use it in GitHub Desktop.
A bytebuffer queue for scatter/gather ops
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2015 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.examples;
import static java.lang.Math.max;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.util.Arrays;
/**
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
*/
public final class ByteBufferBuffer {
// drain position
private int pos;
// fill position
private int lim;
private final ByteBuffer[] buffers;
public ByteBufferBuffer(final int capacity) {
this.buffers = new ByteBuffer[capacity];
lim = capacity;
}
public ByteBuffer get() {
final int pos = this.pos;
final int lim = this.lim;
if (pos == lim) {
throw new BufferUnderflowException();
}
if (pos + 1 == lim) {
this.lim = 0;
this.pos = 0;
} else {
this.pos = pos + 1;
}
ByteBuffer[] buffers = this.buffers;
try {
return buffers[pos];
} finally {
buffers[pos] = null;
}
}
public ByteBuffer peek() {
final int pos = this.pos;
final int lim = this.lim;
if (pos == lim) {
throw new BufferUnderflowException();
}
return buffers[pos];
}
public void unget(ByteBuffer buffer) {
final ByteBuffer[] buffers = this.buffers;
final int pos = this.pos;
final int lim = this.lim;
if (lim == buffers.length) {
if (pos == 0) {
throw new BufferOverflowException();
}
}
if (pos == 0) {
System.arraycopy(buffers, 0, buffers, 1, lim - pos);
buffers[0] = buffer;
this.lim = lim + 1;
} else {
buffers[this.pos = pos - 1] = buffer;
}
}
public void put(ByteBuffer buffer) {
final ByteBuffer[] buffers = this.buffers;
final int lim = this.lim;
if (lim == buffers.length) {
if (pos == 0) {
throw new BufferOverflowException();
}
compact();
}
buffers[this.lim = lim + 1] = buffer;
}
public void compact() {
final int pos = this.pos;
final int lim = this.lim;
if (pos == lim || pos == 0) {
return;
}
final ByteBuffer[] buffers = this.buffers;
final int rem = lim - pos;
System.arraycopy(buffers, pos, buffers, 0, rem);
Arrays.fill(buffers, max(rem, pos), lim, null);
}
public boolean hasRemaining() {
return pos < lim;
}
public int remaining() {
return lim - pos;
}
public boolean hasSpace() {
return pos > 0 || lim < buffers.length;
}
public int space() {
return buffers.length - (lim - pos);
}
public long emptyTo(GatheringByteChannel channel, BufferPool2 returnPool) throws IOException {
long res = channel.write(buffers, pos, remaining());
while (hasRemaining() && ! peek().hasRemaining()) {
returnPool.free(get());
}
return res;
}
public long fillFrom(ScatteringByteChannel channel, BufferPool2 sourcePool) throws IOException {
compact();
while (hasSpace()) {
put(sourcePool.allocate());
}
try {
return channel.read(buffers, pos, remaining());
} finally {
for (int i = lim - 1; i >= pos; i --) {
if (buffers[i].position() == 0) {
sourcePool.free(buffers[i]);
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment