Skip to content

Instantly share code, notes, and snippets.

@reu
Created January 17, 2022 15:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save reu/b97efa9ae7c7cd7f6e5a027b7ca869e3 to your computer and use it in GitHub Desktop.
Save reu/b97efa9ae7c7cd7f6e5a027b7ca869e3 to your computer and use it in GitHub Desktop.
use anyhow::anyhow;
use aws_sdk_s3::{
model::{CompletedMultipartUpload, CompletedPart},
ByteStream, Client as S3Client,
};
use futures::{StreamExt, TryStreamExt};
use tokio::io::AsyncRead;
use tokio_util::io::ReaderStream;
pub async fn upload_file(
s3: &S3Client,
bucket: &str,
key: &str,
content: impl AsyncRead,
) -> anyhow::Result<()> {
let upload = s3
.create_multipart_upload()
.bucket(bucket)
.key(key)
.send()
.await?;
let upload_id = upload
.upload_id()
.ok_or(anyhow!("Failed to start multipart upload"))?;
let chunk_size = 64 * 1024;
let parts = ReaderStream::with_capacity(content, chunk_size)
.map_ok(|chunk| chunk.to_vec())
.try_chunks(5 * 1024 * 1024 / chunk_size)
.map_ok(|chunks| chunks.into_iter().flatten())
.enumerate()
.map(|(n, part)| async move {
let n: i32 = n.try_into()?;
let n = n + 1;
let part = s3
.upload_part()
.bucket(bucket)
.key(key)
.upload_id(upload_id)
.part_number(n)
.body(ByteStream::from(part?.collect::<Vec<u8>>()))
.send()
.await?;
let etag = part.e_tag().ok_or(anyhow!("No etag on uploaded part"))?;
Ok(CompletedPart::builder().part_number(n).e_tag(etag).build())
})
.buffered(4)
.try_collect::<Vec<_>>()
.await;
if let Err(err) = parts {
s3.abort_multipart_upload()
.bucket(bucket)
.key(key)
.upload_id(upload_id)
.send()
.await?;
return Err(err);
};
s3.complete_multipart_upload()
.bucket(bucket)
.key(key)
.upload_id(upload_id)
.multipart_upload(
CompletedMultipartUpload::builder()
.set_parts(Some(parts?))
.build(),
)
.send()
.await?;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment