Skip to content

Instantly share code, notes, and snippets.

@adamchalmers
Created August 24, 2022 17:52
Show Gist options
  • Save adamchalmers/de1dc12a7eb70501583ac5e26417587c to your computer and use it in GitHub Desktop.
Save adamchalmers/de1dc12a7eb70501583ac5e26417587c to your computer and use it in GitHub Desktop.
use axum::{
body::Bytes,
extract::{self, multipart::MultipartError},
Extension,
};
use futures::Stream;
use reqwest::StatusCode;
async fn upload(
mp: extract::Multipart,
Extension(client): Extension<reqwest::Client>,
) -> Result<String, (StatusCode, String)> {
let streaming = MultipartStream { mp }.into_stream();
let streaming_body = reqwest::Body::wrap_stream(streaming);
client
.put(url::Url::parse("https://example.com/file").unwrap())
.body(streaming_body)
.send()
.await
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Oops".to_string()))
.map(|_| "Yay".to_string())
}
/// Wrapper around Multipart that impls Stream in a 'static way.
struct MultipartStream {
mp: extract::Multipart,
}
impl MultipartStream {
/// Owns the Multipart, so it's 'static
fn into_stream(mut self) -> impl Stream<Item = Result<Bytes, MultipartError>> {
async_stream::stream! {
while let Some(field) = self.mp.next_field().await.unwrap() {
// `field` is Field<'a>, it references data owned by `self.mp`.
// It impls Stream, but it isn't 'static, so it cannot be used as a Reqwest body directly.
// Luckily, the return value of this function _is_ 'static.
for await value in field {
yield value;
}
}
}
}
}
@adamchalmers
Copy link
Author

This is in response to a question in the reqwest discord -- how do you write an Axum endpoint that accepts a multipart request body and streams it into an outgoing reqwest body?

It sounds simple, but it's actually pretty hard. Why? Well, the incoming request's multipart body contains several field. Each is Field<'a> where 'a is the lifetime of the parent Multipart. OK, so fields reference data from the Multipart, and therefore cannot outlive the Multipart. Makes sense.

But reqwest only lets you send a body stream which is 'static (i.e. not borrowed, or borrowed for the entire time your program runs). Why? Because of reqwest's particular design (connections have their own task, so when you send a body, it gets moved into that connection's dedicated task -- it can't reference data from a different task T1, because what if T1 goes away before the connection task finishes sending?)

So you can't use the Field stream as a reqwest body, because it's borrowed.

The solution is to make a new wrapper stream, which owns the Multipart and implements its own Stream. This stream iterates over each Multipart field and then yields from the per-field streams. Because the wrapper owns all the data, the resulting stream is 'static and can be used as a reqwest body :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment