Skip to content

Instantly share code, notes, and snippets.

@sfackler
Created October 22, 2019 22:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sfackler/3f5b9435548918775b1e360f209cdb99 to your computer and use it in GitHub Desktop.
Save sfackler/3f5b9435548918775b1e360f209cdb99 to your computer and use it in GitHub Desktop.
use bytes::Buf;
use futures::task::Context;
use futures::Sink;
use http::HeaderMap;
use http_body::{Body, SizeHint};
use pin_project::pin_project;
use std::pin::Pin;
use std::task::Poll;
pub trait WriteBody {
type Data: Buf;
type Error;
fn poll_write_body(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
sink: &mut BodySink<Self::Data>,
) -> Poll<Result<(), Self::Error>>;
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>>;
fn size_hint(&self) -> SizeHint;
}
#[pin_project]
pub struct BodySink<T> {
chunk: Option<T>,
}
impl<T> Sink<T> for BodySink<T> {
type Error = ();
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.chunk.is_some() {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
assert!(self.chunk.is_none());
*self.project().chunk = Some(item);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
#[pin_project]
pub struct PushBody<T> {
#[pin]
writer: T,
at_end: bool,
}
impl<T> Body for PushBody<T>
where
T: WriteBody,
{
type Data = T::Data;
type Error = T::Error;
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let this = self.project();
if *this.at_end {
return Poll::Ready(None);
}
let mut sink = BodySink { chunk: None };
if this.writer.poll_write_body(cx, &mut sink)?.is_ready() {
*this.at_end = true;
Poll::Ready(sink.chunk.map(Ok))
} else {
match sink.chunk {
Some(chunk) => Poll::Ready(Some(Ok(chunk))),
None => Poll::Pending,
}
}
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
self.project().writer.poll_trailers(cx)
}
fn is_end_stream(&self) -> bool {
self.at_end
}
fn size_hint(&self) -> SizeHint {
self.writer.size_hint()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment