Last active
July 12, 2021 12:17
-
-
Save animeshtrivedi/436d95bd8a43a6f47e580594cb8138c3 to your computer and use it in GitHub Desktop.
WritableByteChannel implementation for HDFS. WritableByteChannel is used in Apache ArrowFileWriter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* This code snippet is a part of the blog at | |
https://github.com/animeshtrivedi/blog/blob/master/post/2017-12-26-arrow.md | |
*/ | |
import org.apache.hadoop.fs.FSDataOutputStream; | |
import java.io.IOException; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.WritableByteChannel; | |
/** | |
* Created by atr on 19.12.17. | |
*/ | |
public class HDFSWritableByteChannel implements WritableByteChannel { | |
private FSDataOutputStream outStream; | |
private Boolean isOpen; | |
private byte[] tempBuffer; | |
public HDFSWritableByteChannel(FSDataOutputStream outStream){ | |
this.outStream = outStream; | |
this.isOpen = true; | |
// 1MB buffering | |
this.tempBuffer = new byte[1024*1024]; | |
} | |
private int writeDirectBuffer(ByteBuffer src) throws IOException { | |
int remaining = src.remaining(); | |
int soFar = 0; | |
while(soFar < remaining){ | |
int toPush = Math.min(remaining - soFar, this.tempBuffer.length); | |
// this will move the position index | |
src.get(this.tempBuffer, 0, toPush); | |
// i have no way of knowing how much can i push at HDFS | |
this.outStream.write(this.tempBuffer, 0, toPush); | |
soFar+=toPush; | |
} | |
return remaining; | |
} | |
private int writeHeapBuffer(ByteBuffer src) throws IOException { | |
int remaining = src.remaining(); | |
// get the heap buffer directly and copy | |
this.outStream.write(src.array(), src.position(), remaining); | |
src.position(src.position() + remaining); | |
return remaining; | |
} | |
@Override | |
final public int write(ByteBuffer src) throws IOException { | |
if(src.isDirect()){ | |
return writeDirectBuffer(src); | |
} else { | |
return writeHeapBuffer(src); | |
} | |
} | |
@Override | |
final public boolean isOpen() { | |
return this.isOpen; | |
} | |
@Override | |
final public void close() throws IOException { | |
// flushes the client buffer | |
this.outStream.hflush(); | |
// to the disk | |
this.outStream.hsync(); | |
this.outStream.close(); | |
this.isOpen = false; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thank you ❤️