-
-
Save Darksonn/f33ce1029da7188b8c4e8b0eab1da758 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! Futures that parses the `ResponseFuture` returned from hyper. | |
use std::future::Future; | |
use std::task::{Context, Poll}; | |
use std::pin::Pin; | |
use futures::stream::Stream; | |
use http::response::Parts; | |
use http::StatusCode; | |
use hyper::{client::ResponseFuture, Body}; | |
use serde::Deserialize; | |
use std::marker::PhantomData; | |
use std::mem; | |
use crate::B2Error; | |
mod b2_stream; | |
//pub use self::b2_stream::B2Stream; | |
/// A future that reads all data from a hyper future and parses it with `serde_json`. | |
pub struct B2Future<T> { | |
state: State<T>, | |
} | |
enum State<T> { | |
Connecting(ResponseFuture), | |
Collecting(Parts, Body, Vec<u8>), | |
FailImmediately(B2Error), | |
Done(PhantomData<T>), | |
} | |
impl<T> B2Future<T> | |
where | |
for<'de> T: Deserialize<'de>, | |
{ | |
/// Create a new `B2Future`. | |
pub fn new(resp: ResponseFuture) -> Self { | |
B2Future { | |
state: State::Connecting(resp), | |
} | |
} | |
/// Create a `B2Future` that immediately fails with the specified error. | |
pub fn err<E: Into<B2Error>>(err: E) -> Self { | |
B2Future { | |
state: State::FailImmediately(err.into()), | |
} | |
} | |
/// Returns `true` if the future is done. | |
pub fn is_done(&self) -> bool { | |
match self.state { | |
State::Done(_) => true, | |
_ => false, | |
} | |
} | |
} | |
impl<T> Future for B2Future<T> | |
where | |
for<'de> T: Deserialize<'de>, | |
{ | |
type Output = Result<T, B2Error>; | |
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T, B2Error>> { | |
let state_ref = unsafe { &mut self.get_unchecked_mut().state }; | |
loop { | |
if let Some(poll) = unsafe { state_ref.poll(cx) } { | |
return poll; | |
} | |
} | |
} | |
} | |
impl<T> State<T> | |
where | |
for<'de> T: Deserialize<'de>, | |
{ | |
#[inline] | |
fn done() -> Self { | |
State::Done(PhantomData) | |
} | |
/// self must be pinned in order to call this function | |
#[inline] | |
unsafe fn poll(&mut self, cx: &mut Context) | |
-> Option<Poll<Result<T, B2Error>>> | |
{ | |
match self { | |
State::Connecting(ref mut fut) => { | |
let fut_pin = Pin::new_unchecked(fut); | |
match fut_pin.poll(cx) { | |
Poll::Pending => { | |
Some(Poll::Pending) | |
} | |
Poll::Ready(Ok(resp)) => { | |
let (parts, body) = resp.into_parts(); | |
let size = crate::get_content_length(&parts); | |
*self = State::Collecting(parts, body, Vec::with_capacity(size)); | |
None | |
} | |
Poll::Ready(Err(e)) => { | |
*self = State::done(); | |
Some(Poll::Ready(Err(e.into()))) | |
} | |
} | |
}, | |
State::Collecting(ref parts, ref mut body, ref mut bytes) => { | |
let stream_pin = Pin::new_unchecked(body); | |
match stream_pin.poll_next(cx) { | |
Poll::Pending => { | |
Some(Poll::Pending) | |
} | |
Poll::Ready(Some(Ok(chunk))) => { | |
bytes.extend(&chunk[..]); | |
None | |
} | |
Poll::Ready(None) => { | |
if parts.status == StatusCode::OK { | |
match ::serde_json::from_slice(&bytes) { | |
Ok(t) => { | |
*self = State::done(); | |
Some(Poll::Ready(Ok(t))) | |
} | |
Err(e) => { | |
*self = State::done(); | |
Some(Poll::Ready(Err(e.into()))) | |
} | |
} | |
} else { | |
match ::serde_json::from_slice(&bytes) { | |
Ok(err_msg) => { | |
let err = B2Error::B2Error(parts.status, err_msg); | |
*self = State::done(); | |
Some(Poll::Ready(Err(err))) | |
} | |
Err(e) => { | |
*self = State::done(); | |
Some(Poll::Ready(Err(e.into()))) | |
} | |
} | |
} | |
} | |
Poll::Ready(Some(Err(e))) => { | |
*self = State::done(); | |
Some(Poll::Ready(Err(e.into()))) | |
} | |
} | |
}, | |
State::FailImmediately(err) => { | |
// Put in a dummy error | |
let err = mem::replace(err, B2Error::ApiInconsistency(String::new())); | |
*self = State::done(); | |
Some(Poll::Ready(Err(err))) | |
} | |
State::Done(_) => { | |
panic!("poll on finished backblaze_b2::b2_future::B2Future"); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment