Skip to content

Instantly share code, notes, and snippets.

Created April 12, 2017 11:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/e793c7d8047ed474cf6808f588d836ce to your computer and use it in GitHub Desktop.
Save anonymous/e793c7d8047ed474cf6808f588d836ce to your computer and use it in GitHub Desktop.
Shared via Rust Playground
#[macro_export]
macro_rules! try_ready {
($e:expr) => (match $e {
Ok($crate::Async::Ready(t)) => t,
Ok($crate::Async::NotReady) => return Ok($crate::Async::NotReady),
Err(e) => return Err(From::from(e)),
})
}
pub trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}
pub trait IntoFuture {
type Future: Future<Item=Self::Item, Error=Self::Error>;
type Item;
type Error;
fn into_future(self) -> Self::Future;
}
pub enum Async<T> {
Ready(T),
NotReady,
}
impl<T> Async<T> {
pub fn map<F, U>(self, f: F) -> Async<U>
where F: FnOnce(T) -> U
{
match self {
Async::Ready(t) => Async::Ready(f(t)),
Async::NotReady => Async::NotReady,
}
}
pub fn is_ready(&self) -> bool {
match *self {
Async::Ready(_) => true,
Async::NotReady => false,
}
}
pub fn is_not_ready(&self) -> bool {
!self.is_ready()
}
}
type Poll<T, E> = Result<Async<T>, E>;
pub trait Stream : Sized {
type Item;
type Error;
fn poll(self) -> Poll<Option<(Self::Item, Self)>, Self::Error>;
}
pub struct ForEach<S, F, U> where U: IntoFuture {
stream: Option<S>,
f: F,
fut: Option<U::Future>,
}
pub fn new<S, F, U>(s: S, f: F) -> ForEach<S, F, U>
where S: Stream,
F: FnMut(S::Item) -> U,
U: IntoFuture<Item = (), Error = S::Error>,
{
ForEach {
stream: Some(s),
f: f,
fut: None,
}
}
impl<S, F, U> Future for ForEach<S, F, U>
where S: Stream,
F: FnMut(S::Item) -> U,
U: IntoFuture<Item= (), Error = S::Error>,
{
type Item = ();
type Error = S::Error;
fn poll(&mut self) -> Poll<(), S::Error> {
loop {
if let Some(mut fut) = self.fut.take() {
if try!(fut.poll()).is_not_ready() {
self.fut = Some(fut);
return Ok(Async::NotReady);
}
}
match self.stream.take() {
Some(stream) => {
if let Some((item, stream)) = try_ready!(stream.poll()) {
self.fut = Some((self.f)(item).into_future());
self.stream = Some(stream);
}
},
None => return Ok(Async::Ready(())),
}
}
}
}
fn main() {
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment