Skip to content

Instantly share code, notes, and snippets.

@jcputney
Forked from blagerweij/S3OutputStream.java
Last active September 12, 2024 15:59
Show Gist options
  • Save jcputney/b5daeb86a1c0696859da2a0c3b466327 to your computer and use it in GitHub Desktop.
Save jcputney/b5daeb86a1c0696859da2a0c3b466327 to your computer and use it in GitHub Desktop.
OutputStream which wraps AWS Java SDK v2 S3Client, with support for streaming large files directly to S3
import java.io.ByteArrayInputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
public class S3OutputStream extends OutputStream {
/**
* Default chunk size is 10MB
*/
protected static final int BUFFER_SIZE = 10000000;
/**
* The bucket-name on Amazon S3
*/
private final String bucket;
/**
* The path (key) name within the bucket
*/
private final String path;
/**
* The temporary buffer used for storing the chunks
*/
private final byte[] buf;
private final S3Client s3Client;
/**
* Collection of the etags for the parts that have been uploaded
*/
private final List<String> etags;
/**
* The position in the buffer
*/
private int position;
/**
* The unique id for this upload
*/
private String uploadId;
/**
* indicates whether the stream is still open / valid
*/
private boolean open;
/**
* Creates a new S3 OutputStream
*
* @param s3Client the AmazonS3 client
* @param bucket name of the bucket
* @param path path within the bucket
*/
public S3OutputStream(S3Client s3Client, String bucket, String path) {
this.s3Client = s3Client;
this.bucket = bucket;
this.path = path;
buf = new byte[BUFFER_SIZE];
position = 0;
etags = new ArrayList<>();
open = true;
}
public void cancel() {
open = false;
if (uploadId != null) {
s3Client.abortMultipartUpload(AbortMultipartUploadRequest.builder()
.bucket(bucket)
.key(path)
.uploadId(uploadId)
.build());
}
}
@Override
public void write(int b) {
assertOpen();
if (position >= buf.length) {
flushBufferAndRewind();
}
buf[position++] = (byte) b;
}
/**
* Write an array to the S3 output stream.
*
* @param b the byte-array to append
*/
@Override
public void write(byte[] b) {
write(b, 0, b.length);
}
/**
* Writes an array to the S3 Output Stream
*
* @param byteArray the array to write
* @param o the offset into the array
* @param l the number of bytes to write
*/
@Override
public void write(byte[] byteArray, int o, int l) {
assertOpen();
int ofs = o;
int len = l;
int size;
while (len > (size = buf.length - position)) {
System.arraycopy(byteArray, ofs, buf, position, size);
position += size;
flushBufferAndRewind();
ofs += size;
len -= size;
}
System.arraycopy(byteArray, ofs, buf, position, len);
position += len;
}
/**
* Flushes the buffer by uploading a part to S3.
*/
@Override
public synchronized void flush() {
assertOpen();
}
@Override
public void close() {
if (open) {
open = false;
if (uploadId != null) {
if (position > 0) {
uploadPart();
}
CompletedPart[] completedParts = new CompletedPart[etags.size()];
for (int i = 0; i < etags.size(); i++) {
completedParts[i] = CompletedPart.builder()
.eTag(etags.get(i))
.partNumber(i + 1)
.build();
}
CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder()
.parts(completedParts)
.build();
CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder()
.bucket(bucket)
.key(path)
.uploadId(uploadId)
.multipartUpload(completedMultipartUpload)
.build();
s3Client.completeMultipartUpload(completeMultipartUploadRequest);
} else {
PutObjectRequest putRequest = PutObjectRequest.builder()
.bucket(bucket)
.key(path)
.contentLength((long) position)
.build();
RequestBody requestBody = RequestBody.fromInputStream(new ByteArrayInputStream(buf, 0, position),
position);
s3Client.putObject(putRequest, requestBody);
}
}
}
private void assertOpen() {
if (!open) {
throw new IllegalStateException("Closed");
}
}
protected void flushBufferAndRewind() {
if (uploadId == null) {
CreateMultipartUploadRequest uploadRequest = CreateMultipartUploadRequest.builder()
.bucket(bucket)
.key(path)
.build();
CreateMultipartUploadResponse multipartUpload = s3Client.createMultipartUpload(uploadRequest);
uploadId = multipartUpload.uploadId();
}
uploadPart();
position = 0;
}
protected void uploadPart() {
UploadPartRequest uploadRequest = UploadPartRequest.builder()
.bucket(bucket)
.key(path)
.uploadId(uploadId)
.partNumber(etags.size() + 1)
.contentLength((long) position)
.build();
RequestBody requestBody = RequestBody.fromInputStream(new ByteArrayInputStream(buf, 0, position),
position);
UploadPartResponse uploadPartResponse = s3Client.uploadPart(uploadRequest, requestBody);
etags.add(uploadPartResponse.eTag());
}
}
@hamzahanafi11
Copy link

Great work, thanks for this implementation.
I tested in my project and it is working perfectly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment