Skip to content

Instantly share code, notes, and snippets.

@wayslog
Created May 23, 2018 10:01
Show Gist options
  • Save wayslog/8e0d9cd6aaabdf67a8c2601a9df56fe9 to your computer and use it in GitHub Desktop.
Save wayslog/8e0d9cd6aaabdf67a8c2601a9df56fe9 to your computer and use it in GitHub Desktop.
pub struct BufferInterval<T, S, I>
where
T: Stream<Item = I>,
S: Store<Item = I>,
I: Clone,
{
interval: Interval,
rx: T,
store: S,
}
impl<T, S, I> Stream for BufferInterval<T, S, I>
where
T: Stream<Item = I>,
S: Store<Item = I>,
I: Clone,
{
type Item = S;
type Error = Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>> {
loop {
match self.interval.poll()? {
Async::Ready(Some(_inst)) => {
let store = self.store.trancate();
return Ok(Async::Ready(Some(store)));
}
Async::Ready(None) => {
unreachable!();
}
Async::NotReady => {
trace!("not until to trancate");
}
};
match self.rx.poll()? {
Async::Ready(Some(val)) => self.store.store(val),
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => return Ok(Async::Ready(None)),
};
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment