Skip to content

Instantly share code, notes, and snippets.

@slomo
Created June 19, 2019 20:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save slomo/413a771d341d2e69f2ff434061674fe3 to your computer and use it in GitHub Desktop.
Save slomo/413a771d341d2e69f2ff434061674fe3 to your computer and use it in GitHub Desktop.
SeekableFileChannel from Azure Blob Storage
package default;
import com.microsoft.azure.storage.blob.BlobRange;
import com.microsoft.azure.storage.blob.BlockBlobURL;
import com.microsoft.azure.storage.blob.DownloadResponse;
import com.microsoft.azure.storage.blob.ReliableDownloadOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
public class SeekableBlobByteChannel implements SeekableByteChannel {
private final BlockBlobURL blockBlobURL;
private final byte[] buffer;
private final int bufferSize;
private long size = -1;
private long position = -1;
private long bufferIndex = -1;
private boolean open = true;
public SeekableBlobByteChannel(BlockBlobURL blockBlobURL, int bufferSize) {
this.blockBlobURL = blockBlobURL;
this.bufferSize = bufferSize;
this.buffer = new byte[bufferSize];
}
public int read(ByteBuffer byteBuffer) throws IOException {
long startBufferIndex = position / bufferSize * bufferSize;
if (position >= size) {
return -1;
}
if (startBufferIndex != bufferIndex) {
DownloadResponse downloadResponse = blockBlobURL
.download(new BlobRange().withOffset(startBufferIndex).withCount((long) bufferSize), null, false, null)
.blockingGet();
Iterable<ByteBuffer> blobDataSnippets = downloadResponse.body(
new ReliableDownloadOptions().withMaxRetryRequests(0)
).blockingIterable();
int bufferOffset = 0;
for (ByteBuffer blobData : blobDataSnippets) {
int length = blobData.remaining();
blobData.get(buffer, bufferOffset, length);
bufferOffset += length;
}
bufferIndex = startBufferIndex;
}
int bufferOffset = (int)(position - bufferIndex);
int dataLength = Math.min(byteBuffer.remaining(), bufferSize - bufferOffset);
byteBuffer.put(buffer, bufferOffset, dataLength);
position += dataLength;
return dataLength;
}
public int write(ByteBuffer byteBuffer) throws IOException {
throw new IOException(this + " is read only");
}
public long position() throws IOException {
return position;
}
public SeekableByteChannel position(long newPosition) throws IOException {
if (position >= size()) {
throw new IOException("Cannot read after end");
}
position = newPosition;
return this;
}
public long size() throws IOException {
if (size < 0) {
size = blockBlobURL.getProperties().blockingGet().headers().contentLength();
}
return size;
}
public SeekableByteChannel truncate(long l) throws IOException {
throw new IOException(this + " is read only");
}
public boolean isOpen() {
return open;
}
public void close() throws IOException {
this.open = false;
this.position = -1;
}
@Override
public String toString() {
return "SeekableBlobByteChannel{" +
"blockBlobURL=" + blockBlobURL +
", bufferSize=" + bufferSize +
", size=" + size +
", position=" + position +
", open=" + true +
'}';
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment