Skip to content

Instantly share code, notes, and snippets.

@blagerweij
Created October 23, 2020 22:40
Show Gist options
  • Star 11 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save blagerweij/ad1dbb7ee2fff8bcffd372815ad310eb to your computer and use it in GitHub Desktop.
Save blagerweij/ad1dbb7ee2fff8bcffd372815ad310eb to your computer and use it in GitHub Desktop.
OutputStream which wraps S3Client, with support for streaming large files directly to S3
import java.io.ByteArrayInputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
public class S3OutputStream extends OutputStream {
/** Default chunk size is 10MB */
protected static final int BUFFER_SIZE = 10000000;
/** The bucket-name on Amazon S3 */
private final String bucket;
/** The path (key) name within the bucket */
private final String path;
/** The temporary buffer used for storing the chunks */
private final byte[] buf;
/** The position in the buffer */
private int position;
/** Amazon S3 client. TODO: support KMS */
private final AmazonS3 s3Client;
/** The unique id for this upload */
private String uploadId;
/** Collection of the etags for the parts that have been uploaded */
private final List<PartETag> etags;
/** indicates whether the stream is still open / valid */
private boolean open;
/**
* Creates a new S3 OutputStream
* @param s3Client the AmazonS3 client
* @param bucket name of the bucket
* @param path path within the bucket
*/
public S3OutputStream(AmazonS3 s3Client, String bucket, String path) {
this.s3Client = s3Client;
this.bucket = bucket;
this.path = path;
this.buf = new byte[BUFFER_SIZE];
this.position = 0;
this.etags = new ArrayList<>();
this.open = true;
}
/**
* Write an array to the S3 output stream.
*
* @param b the byte-array to append
*/
@Override
public void write(byte[] b) {
write(b,0,b.length);
}
/**
* Writes an array to the S3 Output Stream
*
* @param byteArray the array to write
* @param o the offset into the array
* @param l the number of bytes to write
*/
@Override
public void write(final byte[] byteArray, final int o, final int l) {
this.assertOpen();
int ofs = o, len = l;
int size;
while (len > (size = this.buf.length - position)) {
System.arraycopy(byteArray, ofs, this.buf, this.position, size);
this.position += size;
flushBufferAndRewind();
ofs += size;
len -= size;
}
System.arraycopy(byteArray, ofs, this.buf, this.position, len);
this.position += len;
}
/**
* Flushes the buffer by uploading a part to S3.
*/
@Override
public synchronized void flush() {
this.assertOpen();
}
protected void flushBufferAndRewind() {
if (uploadId == null) {
final InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(this.bucket, this.path)
.withCannedACL(CannedAccessControlList.BucketOwnerFullControl);
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(request);
this.uploadId = initResponse.getUploadId();
}
uploadPart();
this.position = 0;
}
protected void uploadPart() {
UploadPartResult uploadResult = this.s3Client.uploadPart(new UploadPartRequest()
.withBucketName(this.bucket)
.withKey(this.path)
.withUploadId(this.uploadId)
.withInputStream(new ByteArrayInputStream(buf,0,this.position))
.withPartNumber(this.etags.size() + 1)
.withPartSize(this.position));
this.etags.add(uploadResult.getPartETag());
}
@Override
public void close() {
if (this.open) {
this.open = false;
if (this.uploadId != null) {
if (this.position > 0) {
uploadPart();
}
this.s3Client.completeMultipartUpload(new CompleteMultipartUploadRequest(bucket, path, uploadId, etags));
}
else {
final ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(this.position);
final PutObjectRequest request = new PutObjectRequest(this.bucket, this.path, new ByteArrayInputStream(this.buf, 0, this.position), metadata)
.withCannedAcl(CannedAccessControlList.BucketOwnerFullControl);
this.s3Client.putObject(request);
}
}
}
public void cancel() {
this.open = false;
if (this.uploadId != null) {
this.s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(this.bucket, this.path, this.uploadId));
}
}
@Override
public void write(int b) {
this.assertOpen();
if (position >= this.buf.length) {
flushBufferAndRewind();
}
this.buf[position++] = (byte)b;
}
private void assertOpen() {
if (!this.open) {
throw new IllegalStateException("Closed");
}
}
}
@LeoColman
Copy link

Hey!
I adapted this file to use a KMS key. It's also a little bit shorter due to using Kotlin instead of Java, but it should work interoperably.

https://github.com/GuiaBolso/sftp-to-s3-connector/blob/7f98b173ac67e22a234f713f91633fb6389531b3/src/main/kotlin/br/com/guiabolso/sftptos3connector/internal/s3/S3OutputStream.kt

@Kalyan-D
Copy link

Unable to use this class.

We have created an inputstream from fileInputStream and also output stream using this class.
But when I ran the code the file is not uploaded in S3.

PFB for code snippet used:

import static org.jsoup.Jsoup.parse;

_```
public class LocalToS3 {
static TransferManager tm;
static AmazonS3 s3Client;
static String bucketName;
static String filePath;
static S3Utility s3Utility = new S3Utility();

public static void main(String[] args) throws Exception {


    HashMap<String, Object> s3Creds = JSONUtils.jsonToMap(FileUtils.readFileToString(new File(args[0]), Charset.defaultCharset()));

    s3Client = s3Utility.configureS3Client(s3Creds);

 
    bucketName = s3Creds.get("bucket").toString();
    filePath = s3Creds.get("dest_path").toString();
    InputStream inputStream = new FileInputStream(new File("C:\\workarea\\Test\\S3Utility\\S3Utility\\wikipathways-20210310-gpml-Anopheles_gambiae.zip"));
    OutputStream outputStream = new S3OutputStream(s3Client, bucketName, "data/unstructured/rsa/Anopheles_gambiae.zip");
    IOUtils.copy(inputStream, outputStream);

    LocalToS3 localToS3 = new LocalToS3();
   

}

}



Please help me how to use this?

@zubinjain
Copy link

Unable to use this class.

We have created an inputstream from fileInputStream and also output stream using this class.
But when I ran the code the file is not uploaded in S3.

PFB for code snippet used:

import static org.jsoup.Jsoup.parse;

_```
public class LocalToS3 {
static TransferManager tm;
static AmazonS3 s3Client;
static String bucketName;
static String filePath;
static S3Utility s3Utility = new S3Utility();

public static void main(String[] args) throws Exception {


    HashMap<String, Object> s3Creds = JSONUtils.jsonToMap(FileUtils.readFileToString(new File(args[0]), Charset.defaultCharset()));

    s3Client = s3Utility.configureS3Client(s3Creds);

 
    bucketName = s3Creds.get("bucket").toString();
    filePath = s3Creds.get("dest_path").toString();
    InputStream inputStream = new FileInputStream(new File("C:\\workarea\\Test\\S3Utility\\S3Utility\\wikipathways-20210310-gpml-Anopheles_gambiae.zip"));
    OutputStream outputStream = new S3OutputStream(s3Client, bucketName, "data/unstructured/rsa/Anopheles_gambiae.zip");
    IOUtils.copy(inputStream, outputStream);

    LocalToS3 localToS3 = new LocalToS3();
   

}

}



Please help me how to use this?

Your code is fine. You just need to close the output stream

@jcputney
Copy link

jcputney commented Oct 1, 2022

I've updated this gist for anyone using the v2 of the AWS Java SDK: https://gist.github.com/jcputney/b5daeb86a1c0696859da2a0c3b466327

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment