Skip to content

Instantly share code, notes, and snippets.

@Darksonn
Created November 2, 2019 21:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Darksonn/f33ce1029da7188b8c4e8b0eab1da758 to your computer and use it in GitHub Desktop.
Save Darksonn/f33ce1029da7188b8c4e8b0eab1da758 to your computer and use it in GitHub Desktop.
//! Futures that parses the `ResponseFuture` returned from hyper.
use std::future::Future;
use std::task::{Context, Poll};
use std::pin::Pin;
use futures::stream::Stream;
use http::response::Parts;
use http::StatusCode;
use hyper::{client::ResponseFuture, Body};
use serde::Deserialize;
use std::marker::PhantomData;
use std::mem;
use crate::B2Error;
mod b2_stream;
//pub use self::b2_stream::B2Stream;
/// A future that reads all data from a hyper future and parses it with `serde_json`.
pub struct B2Future<T> {
state: State<T>,
}
enum State<T> {
Connecting(ResponseFuture),
Collecting(Parts, Body, Vec<u8>),
FailImmediately(B2Error),
Done(PhantomData<T>),
}
impl<T> B2Future<T>
where
for<'de> T: Deserialize<'de>,
{
/// Create a new `B2Future`.
pub fn new(resp: ResponseFuture) -> Self {
B2Future {
state: State::Connecting(resp),
}
}
/// Create a `B2Future` that immediately fails with the specified error.
pub fn err<E: Into<B2Error>>(err: E) -> Self {
B2Future {
state: State::FailImmediately(err.into()),
}
}
/// Returns `true` if the future is done.
pub fn is_done(&self) -> bool {
match self.state {
State::Done(_) => true,
_ => false,
}
}
}
impl<T> Future for B2Future<T>
where
for<'de> T: Deserialize<'de>,
{
type Output = Result<T, B2Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T, B2Error>> {
let state_ref = unsafe { &mut self.get_unchecked_mut().state };
loop {
if let Some(poll) = unsafe { state_ref.poll(cx) } {
return poll;
}
}
}
}
impl<T> State<T>
where
for<'de> T: Deserialize<'de>,
{
#[inline]
fn done() -> Self {
State::Done(PhantomData)
}
/// self must be pinned in order to call this function
#[inline]
unsafe fn poll(&mut self, cx: &mut Context)
-> Option<Poll<Result<T, B2Error>>>
{
match self {
State::Connecting(ref mut fut) => {
let fut_pin = Pin::new_unchecked(fut);
match fut_pin.poll(cx) {
Poll::Pending => {
Some(Poll::Pending)
}
Poll::Ready(Ok(resp)) => {
let (parts, body) = resp.into_parts();
let size = crate::get_content_length(&parts);
*self = State::Collecting(parts, body, Vec::with_capacity(size));
None
}
Poll::Ready(Err(e)) => {
*self = State::done();
Some(Poll::Ready(Err(e.into())))
}
}
},
State::Collecting(ref parts, ref mut body, ref mut bytes) => {
let stream_pin = Pin::new_unchecked(body);
match stream_pin.poll_next(cx) {
Poll::Pending => {
Some(Poll::Pending)
}
Poll::Ready(Some(Ok(chunk))) => {
bytes.extend(&chunk[..]);
None
}
Poll::Ready(None) => {
if parts.status == StatusCode::OK {
match ::serde_json::from_slice(&bytes) {
Ok(t) => {
*self = State::done();
Some(Poll::Ready(Ok(t)))
}
Err(e) => {
*self = State::done();
Some(Poll::Ready(Err(e.into())))
}
}
} else {
match ::serde_json::from_slice(&bytes) {
Ok(err_msg) => {
let err = B2Error::B2Error(parts.status, err_msg);
*self = State::done();
Some(Poll::Ready(Err(err)))
}
Err(e) => {
*self = State::done();
Some(Poll::Ready(Err(e.into())))
}
}
}
}
Poll::Ready(Some(Err(e))) => {
*self = State::done();
Some(Poll::Ready(Err(e.into())))
}
}
},
State::FailImmediately(err) => {
// Put in a dummy error
let err = mem::replace(err, B2Error::ApiInconsistency(String::new()));
*self = State::done();
Some(Poll::Ready(Err(err)))
}
State::Done(_) => {
panic!("poll on finished backblaze_b2::b2_future::B2Future");
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment