Skip to content

Instantly share code, notes, and snippets.

@conradludgate
Last active December 2, 2021 15:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save conradludgate/f2dc931b4eb663570372d1aad06af4ec to your computer and use it in GitHub Desktop.
Save conradludgate/f2dc931b4eb663570372d1aad06af4ec to your computer and use it in GitHub Desktop.
#![feature(generators, generator_trait)]
use std::ops::Generator;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use futures_core::stream::Stream;
use futures_core::{FusedStream, Future};
use futures_util::{pin_mut, StreamExt};
use pin_project::pin_project;
fn zero_to_three() -> impl Stream<Item = u32> {
AsyncStream::new(|| {
for i in 0..3 {
let fut = async move {
tokio::time::sleep(Duration::from_secs(1)).await;
i
};
yield fut;
}
})
}
#[tokio::main]
async fn main() {
let s = zero_to_three();
pin_mut!(s);
while let Some(value) = s.next().await {
println!("got {}", value);
}
}
#[derive(Debug)]
#[pin_project]
pub struct AsyncStream<G: Generator<Return = ()>>
where
G::Yield: Future,
{
#[pin]
fut: Option<G::Yield>,
#[pin]
generator: Option<G>,
}
impl<G: Generator<Return = ()>> AsyncStream<G>
where
G::Yield: Future,
{
pub fn new(generator: G) -> Self {
Self {
fut: None,
generator: Some(generator),
}
}
}
impl<G> FusedStream for AsyncStream<G>
where
G: Generator<Return = ()>,
G::Yield: Future,
{
fn is_terminated(&self) -> bool {
self.generator.is_none()
}
}
impl<G> Stream for AsyncStream<G>
where
G: Generator<Return = ()>,
G::Yield: Future,
{
type Item = <G::Yield as Future>::Output;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
// poll future straight away
if let Some(p) = Self::poll_fut(&mut this.fut, cx) {
// return result of poll
return Self::map_fut_poll(p, &mut this.generator, &mut this.fut);
}
// poll generator for next future
if Self::resume_gen(&mut this.generator, &mut this.fut) {
let poll = Self::poll_fut(&mut this.fut, cx).unwrap();
Self::map_fut_poll(poll, &mut this.generator, &mut this.fut)
} else {
Poll::Ready(None)
}
}
}
impl<G> AsyncStream<G>
where
G: Generator<Return = ()>,
G::Yield: Future,
{
/// Maps the poll from the future into a poll for the stream
/// Also eagerly prepares the next future for polling
fn map_fut_poll(
poll: Poll<<G::Yield as Future>::Output>,
opt_gen: &mut Pin<&mut Option<G>>,
opt_fut: &mut Pin<&mut Option<G::Yield>>,
) -> Poll<Option<<G::Yield as Future>::Output>> {
match poll {
Poll::Ready(r) => {
Self::resume_gen(opt_gen, opt_fut);
Poll::Ready(Some(r))
}
Poll::Pending => Poll::Pending,
}
}
/// resumes the generator to get the next future
/// if generator is complete, it's dropped.
/// Returns whether a new future is available to poll
fn resume_gen(
opt_gen: &mut Pin<&mut Option<G>>,
opt_fut: &mut Pin<&mut Option<G::Yield>>,
) -> bool {
match opt_gen.as_mut().as_pin_mut() {
Some(gen) => match gen.resume(()) {
std::ops::GeneratorState::Yielded(i) => {
opt_fut.set(Some(i));
true
}
std::ops::GeneratorState::Complete(_) => {
opt_gen.set(None);
false
}
},
None => false,
}
}
/// polls the future. If the response is ready, it drops the future.
fn poll_fut(
opt_fut: &mut Pin<&mut Option<G::Yield>>,
cx: &mut Context<'_>,
) -> Option<Poll<<G::Yield as Future>::Output>> {
if let Some(fut) = opt_fut.as_mut().as_pin_mut() {
match fut.poll(cx) {
Poll::Ready(r) => {
opt_fut.set(None);
Some(Poll::Ready(r))
}
Poll::Pending => Some(Poll::Pending),
}
} else {
None
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment