Skip to content

Instantly share code, notes, and snippets.

@mwmitchell
Created September 3, 2019 12:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mwmitchell/c427158be165329bacc605d721b5f0d7 to your computer and use it in GitHub Desktop.
Save mwmitchell/c427158be165329bacc605d721b5f0d7 to your computer and use it in GitHub Desktop.
package com.lucidworks.fusion.service.connectors.job.emitter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteSource;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
// https://softwarecave.org/2014/02/05/create-temporary-files-and-directories-using-java-nio2/
// Just like Guava's FileBackedOutputStream, but uses NIO instead.
public class NIOFileBackedOutputStream extends OutputStream {
private final int fileThreshold;
private final boolean resetOnFinalize;
private final ByteSource source;
private OutputStream out;
private MemoryOutput memory;
private Path file;
/**
* ByteArrayOutputStream that exposes its internals.
*/
private static class MemoryOutput extends ByteArrayOutputStream {
byte[] getBuffer() {
return buf;
}
int getCount() {
return count;
}
}
/**
* Returns the file holding the data (possibly null).
*/
@VisibleForTesting
synchronized Path getFile() {
return file;
}
/**
* Creates a new instance that uses the given file threshold, and does not reset the data when the
* {@link ByteSource} returned by {@link #asByteSource} is finalized.
*
* @param fileThreshold the number of bytes before the stream should switch to buffering to a file
*/
public NIOFileBackedOutputStream(int fileThreshold) {
this(fileThreshold, false);
}
/**
* Creates a new instance that uses the given file threshold, and optionally resets the data when
* the {@link ByteSource} returned by {@link #asByteSource} is finalized.
*
* @param fileThreshold the number of bytes before the stream should switch to buffering to a file
* @param resetOnFinalize if true, the {@link #reset} method will be called when the
* {@link ByteSource} returned by {@link #asByteSource} is finalized
*/
public NIOFileBackedOutputStream(int fileThreshold, boolean resetOnFinalize) {
this.fileThreshold = fileThreshold;
this.resetOnFinalize = resetOnFinalize;
memory = new MemoryOutput();
out = memory;
if (resetOnFinalize) {
source =
new ByteSource() {
@Override
public InputStream openStream() throws IOException {
return openInputStream();
}
@Override
protected void finalize() {
try {
reset();
} catch (Throwable t) {
t.printStackTrace(System.err);
}
}
};
} else {
source =
new ByteSource() {
@Override
public InputStream openStream() throws IOException {
return openInputStream();
}
};
}
}
/**
* Returns a readable {@link ByteSource} view of the data that has been written to this stream.
*
* @since 15.0
*/
public ByteSource asByteSource() {
return source;
}
private synchronized InputStream openInputStream() throws IOException {
if (file != null) {
return Files.newInputStream(file);
} else {
return new ByteArrayInputStream(memory.getBuffer(), 0, memory.getCount());
}
}
/**
* Calls {@link #close} if not already closed, and then resets this object back to its initial
* state, for reuse. If data was buffered to a file, it will be deleted.
*
* @throws IOException if an I/O error occurred while deleting the file buffer
*/
public synchronized void reset() throws IOException {
try {
close();
} finally {
if (memory == null) {
memory = new MemoryOutput();
} else {
memory.reset();
}
out = memory;
if (file != null) {
Path deleteMe = file;
file = null;
if (!deleteMe.toFile().delete()) {
throw new IOException("Could not delete: " + deleteMe);
}
}
}
}
@Override
public synchronized void write(int b) throws IOException {
update(1);
out.write(b);
}
@Override
public synchronized void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public synchronized void write(byte[] b, int off, int len) throws IOException {
update(len);
out.write(b, off, len);
}
@Override
public synchronized void close() throws IOException {
out.close();
}
@Override
public synchronized void flush() throws IOException {
out.flush();
}
/**
* Checks if writing {@code len} bytes would go over threshold, and switches to file buffering if
* so.
*/
private void update(int len) throws IOException {
if (file == null && (memory.getCount() + len > fileThreshold)) {
Path temp = Files.createTempFile("FileBackedOutputStream", null);
if (resetOnFinalize) {
// Finalizers are not guaranteed to be called on system shutdown;
// this is insurance.
temp.toFile().deleteOnExit();
//
}
OutputStream transfer = Files.newOutputStream(temp);
transfer.write(memory.getBuffer(), 0, memory.getCount());
transfer.flush();
// We've successfully transferred the data; switch to writing to file
out = transfer;
file = temp;
memory = null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment