Skip to content

Instantly share code, notes, and snippets.

@ithinuel
Created July 26, 2020 16:31
Show Gist options
  • Save ithinuel/528f2290e310be2d9c115507c4b63a40 to your computer and use it in GitHub Desktop.
Save ithinuel/528f2290e310be2d9c115507c4b63a40 to your computer and use it in GitHub Desktop.
Similar to peekable but fetches first and allows to give back 1 item (possible evolution with circ buffer)
use futures::executor::block_on;
use futures::stream::{iter, StreamExt}; // 0.3.5
use pushbackable::*;
fn main() {
let mut strm = iter("Hello".chars())
.then(pend_once::PendOnce::new)
.push_backable();
//let strm = Parser::new(strm);
println!("{:?}", block_on((&mut strm).take(6).collect::<Vec<_>>()));
println!("{:?}", strm.push_back('b'));
println!("{:?}", strm.push_back('é'));
println!("{:?}", block_on((&mut strm).take(2).collect::<Vec<_>>()));
}
// =============================================================================
mod pend_once {
use pin_project::pin_project; // 0.4.22
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[pin_project]
pub(super) struct PendOnce<V> {
v: V,
polled_before: bool,
}
impl<V> PendOnce<V> {
pub fn new(v: V) -> Self {
Self {
v,
polled_before: false,
}
}
}
impl<'a, V: Copy> Future for PendOnce<V> {
type Output = V;
fn poll(self: Pin<&mut Self>, c: &mut Context) -> Poll<Self::Output> {
let this = self.project();
if *this.polled_before {
Poll::Ready(*this.v)
} else {
*this.polled_before = true;
c.waker().wake_by_ref();
Poll::Pending
}
}
}
}
mod pushbackable {
use futures::Stream;
use pin_project::pin_project; // 0.4.22 // 0.3.5
use std::pin::Pin;
use std::task::{Context, Poll};
#[pin_project]
pub(super) struct PushBackable<S: Stream> {
#[pin]
stream: S,
val: Option<S::Item>,
}
impl<S: Stream> PushBackable<S> {
pub fn push_back(&mut self, v: S::Item) -> Option<S::Item> {
self.val.replace(v)
}
}
impl<S: Stream> Stream for PushBackable<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<S::Item>> {
let this = self.project();
if let Some(v) = this.val.take() {
Poll::Ready(Some(v))
} else {
this.stream.poll_next(ctx)
}
}
}
pub(super) trait MyStreamExt: Stream {
fn push_backable(self) -> PushBackable<Self>
where
Self: Sized,
{
PushBackable {
stream: self,
val: None,
}
}
}
impl<T: ?Sized> MyStreamExt for T where T: Stream {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment