Skip to content

Instantly share code, notes, and snippets.

@TimLuq
Last active August 9, 2022 13:17
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save TimLuq/83a35453405f4c6e0f63fb2a0caa9f6e to your computer and use it in GitHub Desktop.
Save TimLuq/83a35453405f4c6e0f63fb2a0caa9f6e to your computer and use it in GitHub Desktop.
use core::future::Future;
use core::pin::Pin;
use core::task::Poll;
use futures_core::Stream;
/// Item to return from stream.
pub struct StreamItem {
pub append_length: usize,
}
/// State shared between each sequential item computation.
#[derive(Default)]
struct StreamItemState {
index: u8,
buffer: Vec<u8>,
}
/// The struct to keep state and implement `Stream`.
pub struct TestStream {
aggregation_state: Option<Vec<u8>>,
item_state: Option<Pin<Box<dyn Future<Output = (StreamItemState, StreamItem)>>>>,
}
impl TestStream {
/// Computes a single item from a state. Returns the modified state and the item after completion.
async fn compute_item(mut state: StreamItemState) -> (StreamItemState, StreamItem) {
let i = state.index;
let data: Vec<u8> = vec![i]; // = fs::read(&format("file-{}.txt", i)).await.unwrap();
state.buffer.extend_from_slice(&data);
state.index += 1;
(state, StreamItem {
append_length: data.len()
})
}
/// Create a new instance of this amazing stream.
pub fn new() -> Self {
let init_state: StreamItemState = Default::default();
Self {
aggregation_state: None,
item_state: Some(Box::pin(TestStream::compute_item(init_state)))
}
}
/// After all items have been processed this will return the aggregated data.
pub fn aggregation(&self) -> Option<&Vec<u8>> {
self.aggregation_state.as_ref()
}
}
impl Stream for TestStream {
type Item = StreamItem;
fn poll_next(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context,
) -> core::task::Poll<Option<Self::Item>> {
let slf = std::ops::DerefMut::deref_mut(&mut self);
let (state, res) = if let Some(fut) = slf.item_state.as_mut() {
match Future::poll(fut.as_mut(), cx) {
// return and keep waiting for result
Poll::Pending => return Poll::Pending,
// item computation complete
Poll::Ready((state, res)) => (state, res),
}
} else {
// no items left
return Poll::Ready(None)
};
// prepare next state
slf.item_state = if state.index >= 16 {
slf.aggregation_state = Some(state.buffer);
None
} else {
Some(Box::pin(TestStream::compute_item(state)))
};
// return item
Poll::Ready(Some(res))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment