Skip to content

Instantly share code, notes, and snippets.

@gmbeard
Created December 6, 2017 17:52
Show Gist options
  • Save gmbeard/ca70e89e4ec9ed98436b24b79ee02c29 to your computer and use it in GitHub Desktop.
Save gmbeard/ca70e89e4ec9ed98436b24b79ee02c29 to your computer and use it in GitHub Desktop.
Rough implementation of Rust's Future / Stream traits, to demonstrate their combinators and ownership semantics
use std::mem;
enum Chain<T, F, U> {
First(T, F),
Second(U),
Done,
}
impl<T, F, U> Future for Chain<T, F, U>
where T: Future,
U: Future,
U::Error: From<T::Error>,
F: FnOnce(T::Item) -> U,
{
type Item = U::Item;
type Error = U::Error;
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error> {
let result = match *self {
Chain::First(ref mut val, _) => match val.poll() {
Ok(PollResult::Pending) => return Ok(PollResult::Pending),
Ok(PollResult::Complete(r)) => Ok(r),
Err(e) => Err(e),
},
Chain::Second(ref mut val) => return val.poll(),
Chain::Done => panic!("poll called on finished result!"),
};
let next = match mem::replace(self, Chain::Done) {
Chain::First(_, f) => {
let mut next = f(result?);
match next.poll()? {
PollResult::Complete(r) => return Ok(PollResult::Complete(r)),
_ => Chain::Second(next),
}
}
_ => unreachable!(),
};
*self = next;
Ok(PollResult::Pending)
}
}
enum PollResult<T> {
Pending,
Complete(T),
}
/// A trait to represent a piece of work that will complete
/// after one or more calls to its `poll` function. I.e. work
/// that will complete at some point in the future.
trait Future {
type Item;
type Error;
/// This function must be called periodically in order
/// for the operation to make progress.
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error>;
/// A combinator function that *consumes* self and returns
/// a type that also implements `Future`.
///
/// The important thing to note about this is that `F` must return
/// another type implementing `Future`.
///
/// The type returned by `and_then` will first `poll` `self` and
/// then `poll` the future returned by `F`
fn and_then<F, U>(self, f: F) -> Chain<Self, F, U>
where F: FnOnce(Self::Item) -> U,
Self: Sized,
U: Future,
{
Chain::First(self, f)
}
}
/// Similar to `Future` this type represents work that will complete
/// at some point in the future. It differs in that it will deliver
/// items one after another until there are none left.
trait Stream {
type Item;
type Error;
/// This function must be called periodically in order
/// for the operation to make progress. Once this function
/// returns `Ok(PollResult::Complete(None))` then all the
/// work is complete.
fn poll(&mut self) -> Result<PollResult<Option<Self::Item>>, Self::Error>;
/// A combinator that *consumes* `self` and will call `F`
/// for every item in the stream. The return type of `for_each`
/// implements `Future` and will only complete when the stream
/// completes.
fn for_each<F>(self, f: F) -> ForEach<Self, F>
where F: Fn(Self::Item),
Self: Sized,
{
ForEach(self, f)
}
}
struct ForEach<T: Stream, F>(T, F);
impl<T, F, U> Future for ForEach<T, F>
where T: Stream,
F: Fn(T::Item) -> U,
{
type Item = ();
type Error = T::Error;
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error> {
let value = match self.0.poll()? {
PollResult::Complete(Some(val)) => val,
PollResult::Complete(None) => return Ok(PollResult::Complete(())),
PollResult::Pending => return Ok(PollResult::Pending),
};
(self.1)(value);
Ok(PollResult::Pending)
}
}
/// A `Future` implementing type that will complete on the
/// 2nd `poll` attempt.
enum PollTwice<T> {
First(T),
Second(T),
Done,
}
impl<T> Future for PollTwice<T> {
type Item = T;
type Error = ();
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error> {
eprintln!("PollTwice::poll()");
let next = match mem::replace(self, PollTwice::Done) {
PollTwice::First(val) => PollTwice::Second(val),
PollTwice::Second(val) => return Ok(PollResult::Complete(val)),
PollTwice::Done => panic!("PollTwice can only be polled twice!"),
};
*self = next;
Ok(PollResult::Pending)
}
}
/// A `Future` implementing type that encapsulates a `Channel`,
/// returning ownership once it completes
struct MyThing(PollTwice<Channel>);
impl MyThing {
fn new(channel: Channel) -> MyThing {
MyThing(PollTwice::First(channel))
}
}
impl Future for MyThing {
type Item = Channel;
type Error = ();
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error> {
self.0.poll()
}
}
/// This type is able to perform the following async operations:
///
/// - `do_something`
/// - `do_something_else`
/// - `do_another_thing`
///
/// Each of these will return a `Future` type, *consuming* `self` in
/// the process.
struct Channel;
impl Channel {
/// Creates a `Future` that will *do something*. This function
/// consumes self so that we can pass it to a downstream `Future`.
fn do_something(self) -> ChannelOp {
ChannelOp(PollTwice::First(self))
}
/// Creates a `Future` that will *do something else*. This function
/// consumes self so that we can pass it to a downstream `Future`.
fn do_something_else(self) -> ChannelOp {
ChannelOp(PollTwice::First(self))
}
/// Creates a `Future` that will *do another thing*. This function
/// consumes self so that we can pass it to a downstream `Future`.
///
/// The result of the future created from this will be a tuple of
/// `(Channel, S: Stream)`.
fn do_another_thing(self) -> ChannelStreamOp {
ChannelStreamOp(
PollTwice::First((
self,
// A `Stream` that counts to 4, resolving
// each value on the second attempt (just to
// simulate an asynchronous operation).
ChannelStream(vec![
PollTwice::First(1),
PollTwice::First(2),
PollTwice::First(3),
PollTwice::First(4),
])
))
)
}
/// This function doesn't return anything. It will just *do the
/// last thing* without consuming `self`
fn do_the_last_thing(&self, val: usize) {
println!("{}", val);
}
}
/// A `Stream` implementing type that just pops the front of
/// its internal `Vec<_>` until there are no values left.
struct ChannelStream(Vec<PollTwice<usize>>);
impl Stream for ChannelStream {
type Item = usize;
type Error = ();
fn poll(&mut self) -> Result<PollResult<Option<Self::Item>>, Self::Error> {
let result = match self.0.get_mut(0) {
None => return Ok(PollResult::Complete(None)),
Some(item) => match item.poll()? {
PollResult::Pending => return Ok(PollResult::Pending),
PollResult::Complete(item) => item,
},
};
self.0.remove(0);
Ok(PollResult::Complete(Some(result)))
}
}
/// A `Future` implementing type that represents async work perfomed
/// by a `Channel`. Internally, we just simulate this work with
/// a `PollTwice` instance.
struct ChannelOp(PollTwice<Channel>);
impl Future for ChannelOp {
type Item = Channel;
type Error = ();
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error> {
self.0.poll()
}
}
/// A `Future` implementing type that represents async work perfomed
/// by a `Channel` and results in a `Stream` type that requires further
/// work. Internally, we just simulate this work with a `PollTwice`
/// instance.
struct ChannelStreamOp(PollTwice<(Channel, ChannelStream)>);
impl Future for ChannelStreamOp {
type Item = (Channel, ChannelStream);
type Error = ();
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error> {
self.0.poll()
}
}
fn main() {
let my_thing = MyThing::new(Channel);
let mut future = my_thing.and_then(|channel| {
// The `do_...` functions (apart from the last) consume
// `channel` (take ownership) so it can be passed down
// the line of `and_then` calls...
channel.do_something() // Returns a `Future`
.and_then(|ch| {
ch.do_something_else() // Returns a `Future`
})
.and_then(|ch| {
ch.do_another_thing() // Returns a `Future`
})
.and_then(|(ch, stream)| {
// The result of `for_each` implements `Future`. It
// will complete when all of its `Stream`'s items are
// exhausted (or it reports an error). We can `move`
// `ch` into the closure because nothing else needs
// it at this stage...
stream.for_each(move |n| {
println!("{}", n);
ch.do_the_last_thing(n);
})
})
});
// `future` is now just a big *state machine*. We need to poll
// it to completion (or error)...
loop {
if let PollResult::Complete(_) = future.poll().unwrap() {
break;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment