Skip to content

Instantly share code, notes, and snippets.

@feliwir
Created March 5, 2024 15:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save feliwir/43ade88898e1f82391d902a26afb7600 to your computer and use it in GitHub Desktop.
Save feliwir/43ade88898e1f82391d902a26afb7600 to your computer and use it in GitHub Desktop.
Checking multipart-rs v0.1.0 (/home/stephan/Development/multipart-rs)
error[E0309]: the parameter type `E` may not live long enough
--> src/reader.rs:74:9
|
46 | impl<'a, E> MultipartReader<'a, E> {
| -- the parameter type `E` must be valid for the lifetime `'a` as defined here...
...
74 | MultipartReader::from_stream_with_boundary_and_type(stream, boundary, multipart_type)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...so that the type `E` will meet its required lifetime bounds
|
help: consider adding an explicit lifetime bound
|
46 | impl<'a, E: 'a> MultipartReader<'a, E> {
| ++++
error[E0309]: the parameter type `E` may not live long enough
--> src/reader.rs:131:9
|
46 | impl<'a, E> MultipartReader<'a, E> {
| -- the parameter type `E` must be valid for the lifetime `'a` as defined here...
...
131 | MultipartReader::from_stream_with_headers(stream, headers)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...so that the type `E` will meet its required lifetime bounds
|
help: consider adding an explicit lifetime bound
|
46 | impl<'a, E: 'a> MultipartReader<'a, E> {
| ++++
warning: unused variable: `e`
--> src/reader.rs:243:38
|
243 | Poll::Ready(Some(Err(e))) => {
| ^ help: if this is intentional, prefix it with an underscore: `_e`
|
= note: `#[warn(unused_variables)]` on by default
For more information about this error, try `rustc --explain E0309`.
warning: `multipart-rs` (lib) generated 1 warning
error: could not compile `multipart-rs` (lib) due to 2 previous errors; 1 warning emitted
use std::{
pin::Pin,
str,
task::{Context, Poll},
};
use bytes::{Buf, Bytes, BytesMut};
use futures_core::{stream::LocalBoxStream, Stream};
use futures_util::StreamExt;
use crate::{error::MultipartError, multipart_type::MultipartType};
#[derive(PartialEq, Debug)]
enum InnerState {
/// Stream eof
Eof,
/// Skip data until first boundary
FirstBoundary,
/// Reading boundary
Boundary,
/// Reading Headers,
Headers,
}
pub struct MultipartItem {
/// Headers
headers: Vec<(String, String)>,
/// Data
data: BytesMut,
}
pub struct MultipartReader<'a, E> {
pub boundary: String,
pub multipart_type: MultipartType,
/// Inner state
state: InnerState,
stream: LocalBoxStream<'a, Result<Bytes, E>>,
buf: BytesMut,
pending_item: Option<MultipartItem>,
}
impl<'a, E> MultipartReader<'a, E> {
pub fn from_stream_with_boundary_and_type<S>(
stream: S,
boundary: &str,
multipart_type: MultipartType,
) -> Result<MultipartReader<'a, E>, MultipartError>
where
S: Stream<Item = Result<Bytes, E>> + 'a,
{
Ok(MultipartReader {
stream: stream.boxed_local(),
boundary: boundary.to_string(),
multipart_type: multipart_type,
state: InnerState::FirstBoundary,
pending_item: None,
buf: BytesMut::new(),
})
}
pub fn from_data_with_boundary_and_type(
data: &[u8],
boundary: &str,
multipart_type: MultipartType,
) -> Result<MultipartReader<'a, E>, MultipartError>
where
E: std::error::Error,
{
let stream = futures_util::stream::iter(vec![Ok(Bytes::copy_from_slice(data))]);
MultipartReader::from_stream_with_boundary_and_type(stream, boundary, multipart_type)
}
pub fn from_stream_with_headers<S>(
stream: S,
headers: &Vec<(String, String)>,
) -> Result<MultipartReader<'a, E>, MultipartError>
where
S: Stream<Item = Result<Bytes, E>> + 'a,
E: std::error::Error,
{
// Search for the content-type header
let content_type = headers
.iter()
.find(|(key, _)| key.to_lowercase() == "content-type");
if content_type.is_none() {
return Err(MultipartError::NoContentType);
}
let ct = content_type
.unwrap()
.1
.parse::<mime::Mime>()
.map_err(|_e| MultipartError::InvalidContentType)?;
let boundary = ct
.get_param(mime::BOUNDARY)
.ok_or(MultipartError::InvalidBoundary)?;
if ct.type_() != mime::MULTIPART {
return Err(MultipartError::InvalidContentType);
}
let multipart_type = ct
.subtype()
.as_str()
.parse::<MultipartType>()
.map_err(|_| MultipartError::InvalidMultipartType)?;
Ok(MultipartReader {
stream: stream.boxed_local(),
boundary: boundary.to_string(),
multipart_type: multipart_type,
state: InnerState::FirstBoundary,
pending_item: None,
buf: BytesMut::new(),
})
}
pub fn from_data_with_headers(
data: &[u8],
headers: &Vec<(String, String)>,
) -> Result<MultipartReader<'a, E>, MultipartError>
where
E: std::error::Error,
{
let stream = futures_util::stream::iter(vec![Ok(Bytes::copy_from_slice(data))]);
MultipartReader::from_stream_with_headers(stream, headers)
}
// TODO: make this RFC compliant
fn is_boundary(self: &Self, data: &[u8]) -> bool {
data.starts_with(self.boundary.as_bytes())
}
}
impl<'a, E> Stream for MultipartReader<'a, E> {
type Item = Result<MultipartItem, MultipartError>;
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let finder = memchr::memmem::Finder::new("\r\n");
loop {
while let Some(idx) = finder.find(&this.buf) {
println!("{}", String::from_utf8_lossy(&this.buf[..idx]));
match this.state {
InnerState::FirstBoundary => {
// Check if the last line was a boundary
if this.is_boundary(&this.buf[..idx]) {
this.state = InnerState::Headers;
};
}
InnerState::Boundary => {
// Check if the last line was a boundary
if this.is_boundary(&this.buf[..idx]) {
// If we have a pending item, return it
if let Some(item) = this.pending_item.take() {
// Skip to the next line
this.buf.advance(2 + idx);
// Next state are the headers
this.state = InnerState::Headers;
return std::task::Poll::Ready(Some(Ok(item)));
}
this.state = InnerState::Headers;
this.pending_item = Some(MultipartItem {
headers: vec![],
data: BytesMut::new(),
});
};
// Add the data to the pending item
this.pending_item
.as_mut()
.unwrap()
.data
.extend(&this.buf[..idx])
}
InnerState::Headers => {
// Check if we have a pending item or we should create one
if this.pending_item.is_none() {
this.pending_item = Some(MultipartItem {
headers: vec![],
data: BytesMut::new(),
});
}
// Read the header line and split it into key and value
let header = match str::from_utf8(&this.buf[..idx]) {
Ok(h) => h,
Err(_) => {
this.state = InnerState::Eof;
return std::task::Poll::Ready(Some(Err(
MultipartError::InvalidItemHeader,
)));
}
};
// This is no header anymore, we are at the end of the headers
if header.trim().is_empty() {
this.buf.advance(2 + idx);
this.state = InnerState::Boundary;
continue;
}
let header_parts: Vec<&str> = header.split(": ").collect();
if header_parts.len() != 2 {
this.state = InnerState::Eof;
return std::task::Poll::Ready(Some(Err(
MultipartError::InvalidItemHeader,
)));
}
// Add header entry to the pending item
this.pending_item
.as_mut()
.unwrap()
.headers
.push((header_parts[0].to_string(), header_parts[1].to_string()));
}
InnerState::Eof => {
return std::task::Poll::Ready(None);
}
}
// Skip to the next line
this.buf.advance(2 + idx);
}
// Read more data from the stream
match Pin::new(&mut this.stream).poll_next(cx) {
Poll::Ready(Some(Ok(data))) => {
this.buf.extend_from_slice(&data);
}
Poll::Ready(None) => {
this.state = InnerState::Eof;
return std::task::Poll::Ready(None);
}
Poll::Ready(Some(Err(e))) => {
this.state = InnerState::Eof;
return std::task::Poll::Ready(Some(Err(MultipartError::PollingDataFailed)));
}
Poll::Pending => {
return std::task::Poll::Pending;
}
};
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[futures_test::test]
async fn valid_request() {
let headermap = vec![(
"Content-Type".to_string(),
"multipart/form-data; boundary=--974767299852498929531610575".to_string(),
)];
// Lines must end with CRLF
let data = b"--974767299852498929531610575\r
Content-Disposition: form-data; name=\"text\"\r
\r
text default\r
--974767299852498929531610575\r
Content-Disposition: form-data; name=\"file1\"; filename=\"a.txt\"\r
Content-Type: text/plain\r
\r
Content of a.txt.\r
\r\n--974767299852498929531610575\r
Content-Disposition: form-data; name=\"file2\"; filename=\"a.html\"\r
Content-Type: text/html\r
\r
<!DOCTYPE html><title>Content of a.html.</title>\r
\r
--974767299852498929531610575--\r\n";
assert!(MultipartReader::from_data_with_headers(data, &headermap).is_ok());
assert!(MultipartReader::from_data_with_boundary_and_type(
data,
"--974767299852498929531610575",
MultipartType::FormData
)
.is_ok());
// Poll all the items from the reader
let mut reader = MultipartReader::from_data_with_headers(data, &headermap).unwrap();
assert_eq!(reader.multipart_type, MultipartType::FormData);
let mut items = vec![];
loop {
match reader.next().await {
Some(Ok(item)) => items.push(item),
None => break,
Some(Err(e)) => panic!("Error: {:?}", e),
}
}
assert_eq!(items.len(), 3);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment