Skip to content

Instantly share code, notes, and snippets.

@coord-e
Last active March 13, 2021 09:56
Show Gist options
  • Save coord-e/7794bf029d872d17f315e4d90b670134 to your computer and use it in GitHub Desktop.
Save coord-e/7794bf029d872d17f315e4d90b670134 to your computer and use it in GitHub Desktop.
Upload to S3 (minio) via warp
use std::convert::TryInto;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use anyhow::{bail, Context};
use bytes::{Buf, BufMut, BytesMut};
use futures::stream::{self, Stream, TryStreamExt};
use mime::Mime;
use mpart_async::server::MultipartStream;
use rusoto_s3::{S3Client, S3};
use structopt::StructOpt;
use tracing::{event, Level};
use warp::Filter;
#[derive(Debug, Clone, StructOpt)]
struct Opt {
#[structopt(long, env = "TEST_SERVER_BIND")]
bind: SocketAddr,
#[structopt(short, long, default_value = "test")]
bucket: String,
#[structopt(long, env = "TEST_MINIO_ENDPOINT")]
endpoint: String,
#[structopt(long, env = "TEST_MINIO_ACCESS_KEY")]
access_key: String,
#[structopt(long, env = "TEST_MINIO_ACCESS_SECRET")]
access_secret: String,
}
async fn collect_part_stream<E>(
mut stream: impl Stream<Item = Result<impl Buf, E>> + Unpin,
) -> Result<Vec<u8>, E> {
let mut result = BytesMut::new();
while let Some(buf) = stream.try_next().await? {
result.put(buf);
}
Ok(result.to_vec())
}
async fn stream_handler(
opt: Opt,
client: S3Client,
mime: Mime,
body: impl Stream<Item = Result<impl Buf, warp::Error>> + Send + Sync + Unpin + 'static,
) -> anyhow::Result<()> {
let boundary = mime
.get_param("boundary")
.context("no boundary")?
.to_string();
let mut data = MultipartStream::new(
boundary,
body.map_ok(|mut buf| buf.copy_to_bytes(buf.remaining())),
);
let mut key = None;
let mut stream = None;
let mut length = None;
let mut content_type = None;
while let Ok(Some(part)) = data.try_next().await {
let name = part.name().context("no name")?;
event!(Level::INFO, %name, "part");
match name {
"key" => {
let key_bytes = collect_part_stream(part).await?;
key = Some(String::from_utf8(key_bytes)?);
}
"content-length" => {
let length_bytes = collect_part_stream(part).await?;
length = Some(std::str::from_utf8(&length_bytes)?.parse()?);
}
"content" => {
content_type = Some(part.content_type().context("no content-type")?.to_owned());
stream = Some(part);
}
name => bail!("unknown key {}", name),
}
}
let key = key.context("no key")?;
let length = length.context("no length")?;
let stream = stream
.context("no content")?
.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
event!(Level::INFO, %length, %key, "put object");
let put_request = rusoto_s3::PutObjectRequest {
bucket: opt.bucket.clone(),
key,
body: Some(rusoto_core::ByteStream::new(stream)),
content_length: Some(length),
content_type,
..Default::default()
};
client.put_object(put_request).await?;
Ok(())
}
async fn collect_handler(
opt: Opt,
client: S3Client,
mut data: warp::multipart::FormData,
) -> anyhow::Result<()> {
let mut key = None;
let mut content = None;
let mut content_type = None;
while let Some(part) = data.try_next().await? {
match part.name() {
"key" => {
let key_bytes = collect_part_stream(part.stream()).await?;
key = Some(String::from_utf8(key_bytes)?);
}
"content" => {
content_type = Some(part.content_type().context("no content_type")?.to_owned());
content = Some(collect_part_stream(part.stream()).await?);
}
name => bail!("unknown key {}", name),
}
}
let key = key.context("no key")?;
let content = content.context("no content")?;
let put_request = rusoto_s3::PutObjectRequest {
bucket: opt.bucket.clone(),
key,
body: Some(content.into()),
content_type,
..Default::default()
};
client.put_object(put_request).await?;
Ok(())
}
async fn multipart_handler(
opt: Opt,
client: S3Client,
mime: Mime,
body: impl Stream<Item = Result<impl Buf, warp::Error>> + Send + Sync + Unpin + 'static,
) -> anyhow::Result<()> {
let boundary = mime
.get_param("boundary")
.context("no boundary")?
.to_string();
let mut data = MultipartStream::new(
boundary,
body.map_ok(|mut buf| buf.copy_to_bytes(buf.remaining())),
);
let mut key = None;
let mut stream = None;
let mut content_type = None;
while let Some(part) = data.try_next().await? {
let name = part.name().context("no name")?;
event!(Level::INFO, %name, "part");
match name {
"key" => {
let key_bytes = collect_part_stream(part).await?;
key = Some(String::from_utf8(key_bytes)?);
}
"content" => {
content_type = Some(part.content_type().context("no content_type")?.to_owned());
stream = Some(part);
break;
}
name => bail!("unknown key {}", name),
}
}
let key = key.context("no key")?;
let mut stream = stream.context("no content")?;
let part_size: u64 = 10 * 1024 * 1024;
event!(Level::INFO, "create multipart upload");
let create_multipart_request = rusoto_s3::CreateMultipartUploadRequest {
bucket: opt.bucket.clone(),
key: key.clone(),
content_type,
..Default::default()
};
let create_multipart_output = client
.create_multipart_upload(create_multipart_request)
.await?;
let upload_id = create_multipart_output.upload_id.context("no upload_id")?;
let mut buf = BytesMut::with_capacity((part_size as f64 * 1.5) as usize);
let mut part_number = 0;
let mut completed_parts = Vec::new();
while let Some(chunk) = stream.try_next().await? {
event!(Level::TRACE, len = %chunk.len(), "chunk");
buf.put(chunk);
let len: u64 = buf.len().try_into()?;
if len >= part_size {
event!(Level::INFO, %len, "upload part");
let body = buf.copy_to_bytes(buf.remaining());
let body = rusoto_core::ByteStream::new(stream::once(async move { Ok(body) }));
let upload_request = rusoto_s3::UploadPartRequest {
bucket: opt.bucket.clone(),
key: key.clone(),
body: Some(body),
content_length: Some(len.try_into()?),
part_number,
upload_id: upload_id.clone(),
..Default::default()
};
let output = client.upload_part(upload_request).await?;
let e_tag = output.e_tag.context("no e_tag")?;
completed_parts.push(rusoto_s3::CompletedPart {
e_tag: Some(e_tag),
part_number: Some(part_number),
});
part_number += 1;
}
}
if buf.has_remaining() {
let len = buf.len();
event!(Level::INFO, %len, "upload part (last)");
let body = rusoto_core::ByteStream::new(stream::once(async move { Ok(buf.freeze()) }));
let upload_request = rusoto_s3::UploadPartRequest {
bucket: opt.bucket.clone(),
key: key.clone(),
body: Some(body),
content_length: Some(len.try_into()?),
part_number,
upload_id: upload_id.clone(),
..Default::default()
};
let output = client.upload_part(upload_request).await?;
let e_tag = output.e_tag.context("no e_tag")?;
completed_parts.push(rusoto_s3::CompletedPart {
e_tag: Some(e_tag),
part_number: Some(part_number),
});
}
event!(Level::INFO, "complete multipart upload");
let complete_request = rusoto_s3::CompleteMultipartUploadRequest {
bucket: opt.bucket.clone(),
key,
upload_id,
multipart_upload: Some(rusoto_s3::CompletedMultipartUpload {
parts: Some(completed_parts),
}),
..Default::default()
};
client.complete_multipart_upload(complete_request).await?;
Ok(())
}
async fn multipart_concurrent_handler(
opt: Opt,
client: S3Client,
mime: Mime,
body: impl Stream<Item = Result<impl Buf, warp::Error>> + Send + Sync + Unpin + 'static,
) -> anyhow::Result<()> {
let boundary = mime
.get_param("boundary")
.context("no boundary")?
.to_string();
let mut data = MultipartStream::new(
boundary,
body.map_ok(|mut buf| buf.copy_to_bytes(buf.remaining())),
);
let mut key = None;
let mut stream = None;
let mut content_type = None;
while let Some(part) = data.try_next().await? {
let name = part.name().context("no name")?;
event!(Level::INFO, %name, "part");
match name {
"key" => {
let key_bytes = collect_part_stream(part).await?;
key = Some(String::from_utf8(key_bytes)?);
}
"content" => {
content_type = Some(part.content_type().context("no content_type")?.to_owned());
stream = Some(part);
break;
}
name => bail!("unknown key {}", name),
}
}
let key = key.context("no key")?;
let mut stream = stream.context("no content")?;
let part_size: u64 = 10 * 1024 * 1024;
event!(Level::INFO, "create multipart upload");
let create_multipart_request = rusoto_s3::CreateMultipartUploadRequest {
bucket: opt.bucket.clone(),
key: key.clone(),
content_type,
..Default::default()
};
let create_multipart_output = client
.create_multipart_upload(create_multipart_request)
.await?;
let upload_id = create_multipart_output.upload_id.context("no upload_id")?;
let client = Arc::new(client);
let mut buf = BytesMut::with_capacity((part_size as f64 * 1.5) as usize);
let mut part_number = 0;
let mut part_futures = Vec::new();
while let Some(chunk) = stream.try_next().await? {
event!(Level::TRACE, len = %chunk.len(), "chunk");
buf.put(chunk);
let len: u64 = buf.len().try_into()?;
if len >= part_size {
let body = buf.copy_to_bytes(buf.remaining());
let fut = {
let bucket = opt.bucket.clone();
let upload_id = upload_id.clone();
let key = key.clone();
let client = Arc::clone(&client);
async move {
event!(Level::INFO, %len, "upload part");
let body = rusoto_core::ByteStream::new(stream::once(async move { Ok(body) }));
let upload_request = rusoto_s3::UploadPartRequest {
bucket,
key,
body: Some(body),
content_length: Some(len.try_into()?),
part_number,
upload_id,
..Default::default()
};
let output = client.upload_part(upload_request).await?;
let e_tag = output.e_tag.context("no e_tag")?;
Ok::<_, anyhow::Error>(rusoto_s3::CompletedPart {
e_tag: Some(e_tag),
part_number: Some(part_number),
})
}
};
part_futures.push(tokio::spawn(fut));
part_number += 1;
}
}
if buf.has_remaining() {
let fut = {
let bucket = opt.bucket.clone();
let upload_id = upload_id.clone();
let key = key.clone();
let client = Arc::clone(&client);
async move {
let len = buf.len();
event!(Level::INFO, %len, "upload part (last)");
let body =
rusoto_core::ByteStream::new(stream::once(async move { Ok(buf.freeze()) }));
let upload_request = rusoto_s3::UploadPartRequest {
bucket,
key,
body: Some(body),
content_length: Some(len.try_into()?),
part_number,
upload_id,
..Default::default()
};
let output = client.upload_part(upload_request).await?;
let e_tag = output.e_tag.context("no e_tag")?;
Ok::<_, anyhow::Error>(rusoto_s3::CompletedPart {
e_tag: Some(e_tag),
part_number: Some(part_number),
})
}
};
part_futures.push(tokio::spawn(fut));
}
let mut completed_parts = Vec::with_capacity(part_futures.len());
for fut in part_futures {
completed_parts.push(fut.await??);
}
event!(Level::INFO, "complete multipart upload");
let complete_request = rusoto_s3::CompleteMultipartUploadRequest {
bucket: opt.bucket.clone(),
key,
upload_id,
multipart_upload: Some(rusoto_s3::CompletedMultipartUpload {
parts: Some(completed_parts),
}),
..Default::default()
};
client.complete_multipart_upload(complete_request).await?;
Ok(())
}
#[derive(Debug, Clone)]
struct UploadError(Arc<anyhow::Error>);
impl warp::reject::Reject for UploadError {}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let opt = Opt::from_args();
tracing_subscriber::fmt().pretty().init();
let dispatcher = rusoto_core::request::HttpClient::new()?;
let credentials = rusoto_credential::StaticProvider::new_minimal(
opt.access_key.clone(),
opt.access_secret.clone(),
);
let region = rusoto_core::Region::Custom {
name: "us-east-1".to_owned(),
endpoint: opt.endpoint.clone(),
};
let client = S3Client::new_with(dispatcher, credentials, region);
if client
.head_bucket(rusoto_s3::HeadBucketRequest {
bucket: opt.bucket.clone(),
..Default::default()
})
.await
.is_err()
{
event!(Level::INFO, name = %opt.bucket, "create bucket");
let create_bucket_req = rusoto_s3::CreateBucketRequest {
bucket: opt.bucket.clone(),
..Default::default()
};
client.create_bucket(create_bucket_req).await?;
}
let bind = opt.bind;
let with_opt = warp::any().map(move || opt.clone());
let with_client = warp::any().map(move || client.clone());
let filter = warp::post().and(warp::path("upload")).and(
warp::path("stream")
.and(with_opt.clone())
.and(with_client.clone())
.and(warp::header::<Mime>("content-type"))
.and(warp::body::stream())
.and_then(|opt: Opt, client: S3Client, mime: Mime, data| async move {
match stream_handler(opt, client, mime, data).await {
Ok(()) => Ok("OK"),
Err(err) => Err(warp::reject::custom(UploadError(Arc::new(err)))),
}
})
.or(warp::path("collect")
.and(with_opt.clone())
.and(with_client.clone())
.and(warp::multipart::form().max_length(512 * 1024 * 1024))
.and_then(
|opt: Opt, client: S3Client, data: warp::multipart::FormData| async move {
match collect_handler(opt, client, data).await {
Ok(()) => Ok("OK"),
Err(err) => Err(warp::reject::custom(UploadError(Arc::new(err)))),
}
},
))
.or(warp::path("multipart")
.and(with_opt.clone())
.and(with_client.clone())
.and(warp::header::<Mime>("content-type"))
.and(warp::body::stream())
.and_then(|opt: Opt, client: S3Client, mime: Mime, data| async move {
match multipart_handler(opt, client, mime, data).await {
Ok(()) => Ok("OK"),
Err(err) => Err(warp::reject::custom(UploadError(Arc::new(err)))),
}
}))
.or(warp::path("multipart-concurrent")
.and(with_opt)
.and(with_client)
.and(warp::header::<Mime>("content-type"))
.and(warp::body::stream())
.and_then(|opt: Opt, client: S3Client, mime: Mime, data| async move {
match multipart_concurrent_handler(opt, client, mime, data).await {
Ok(()) => Ok("OK"),
Err(err) => Err(warp::reject::custom(UploadError(Arc::new(err)))),
}
})),
);
warp::serve(filter).run(bind).await;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment