Last active
January 14, 2021 21:16
-
-
Save soruh/ff8f9dfa4f238fe6ed604647abd9100b to your computer and use it in GitHub Desktop.
Buffered Stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::{Stream, StreamExt}; | |
use std::time::Duration; | |
use std::{collections::VecDeque, pin::Pin}; | |
// Why do the spawned tasks that generate the streams in the code below | |
// not run in parallel with at most 2 tasks running at once? | |
static mut START: std::mem::MaybeUninit<std::time::Instant> = std::mem::MaybeUninit::uninit(); | |
macro_rules! log { | |
($l: literal, $i: expr) => { | |
log!($l, $i,); | |
}; | |
($l: literal, $i: expr, $($args:expr),*) => { | |
println!(concat!("[{}] [{:.0?}]: ", $l), $i, unsafe {START.assume_init()}.elapsed(), $($args),*); | |
}; | |
} | |
/// Always pulls `n` additional elements from the `stream` (if available) | |
/// (only fills the buffer on the first poll) | |
struct Buffer<St: Stream> { | |
n: usize, | |
stream: St, | |
buffer: VecDeque<St::Item>, | |
done: bool, | |
} | |
impl<St: Stream + Unpin> Stream for Buffer<St> | |
where | |
Self: Unpin, | |
{ | |
type Item = St::Item; | |
fn poll_next( | |
mut self: std::pin::Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
) -> std::task::Poll<Option<Self::Item>> { | |
use std::task::Poll; | |
let mut item = None; | |
if !self.done { | |
while self.buffer.len() < self.n { | |
if let Poll::Ready(next) = Stream::poll_next(Pin::new(&mut self.stream), cx) { | |
if let Some(next) = next { | |
self.buffer.push_back(next); | |
} else { | |
self.done = true; | |
break; | |
} | |
} else { | |
break; | |
} | |
// If we don't already have an item to yield pop one here | |
// and continue filling the buffer | |
if item.is_none() { | |
item = self.buffer.pop_front(); | |
} | |
} | |
} | |
// If we already have an item take that, otherwise try to pop one from the buffer | |
let item = item.or_else(|| self.buffer.pop_front()); | |
if self.done { | |
// We don't have any more items in the source, if the buffer is empty we're done | |
Poll::Ready(item) | |
} else { | |
// The source is not yet exhaused, so we're still `Pending` if the buffer is empty | |
item.map(|item| Poll::Ready(Some(item))) | |
.unwrap_or(Poll::Pending) | |
} | |
} | |
} | |
impl<St: Stream> Buffer<St> { | |
pub fn new(n: usize, stream: St) -> Self { | |
Self { | |
n, | |
stream, | |
buffer: Default::default(), | |
done: false, | |
} | |
} | |
} | |
#[tokio::main(core_threads = 20)] | |
async fn main() { | |
unsafe { | |
*START.as_mut_ptr() = std::time::Instant::now(); | |
} | |
let mut s = Buffer::new( | |
2, | |
futures::stream::iter(1..=10).map(|i| { | |
log!("\x1b[36mCreated\x1b[0m", i); | |
let (mut tx, rx) = tokio::sync::mpsc::channel(1000); | |
tokio::spawn(async move { | |
log!("\x1b[31mSleeping\x1b[0m", i); | |
tokio::time::delay_for(Duration::from_secs(1)).await; | |
log!("\x1b[32mStarting\x1b[0m", i); | |
for x in 0usize..3 { | |
log!("\x1b[33mSending\x1b[0m {}", i, i + x); | |
tx.send(i + x).await.unwrap(); | |
tokio::time::delay_for(Duration::from_secs(1)).await; | |
} | |
}); | |
rx | |
}), | |
); | |
while let Some(mut a) = s.next().await { | |
while let Some(b) = a.recv().await { | |
log!("\x1b[34mReceived\x1b[0m: {:?}", " ", b); | |
} | |
std::mem::drop(a); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::{stream::Fuse, Stream, StreamExt}; | |
use std::pin::Pin; | |
use std::time::Duration; | |
static mut START: std::mem::MaybeUninit<std::time::Instant> = std::mem::MaybeUninit::uninit(); | |
macro_rules! log { | |
($l: literal, $i: expr) => { | |
log!($l, $i,); | |
}; | |
($l: literal, $i: expr, $($args:expr),*) => { | |
if format!("{}", $i) != "B" { | |
println!(concat!("[{}] [{:.0?}]: ", $l), $i, unsafe {START.assume_init()}.elapsed(), $($args),*); | |
} | |
}; | |
} | |
/// Always pulls `n` additional elements from the `stream` (if available) | |
/// (only fills the buffer on the first poll) | |
struct Buffer<St> | |
where | |
St: Stream, | |
{ | |
stream: Fuse<St>, | |
buffer: Box<[Option<St::Item>]>, | |
} | |
impl<St> Stream for Buffer<St> | |
where | |
St: Stream + Unpin, | |
St::Item: Stream + Unpin, | |
{ | |
type Item = <St::Item as Stream>::Item; | |
fn poll_next( | |
mut self: std::pin::Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
) -> std::task::Poll<Option<Self::Item>> { | |
use std::task::Poll; | |
let Self { stream, buffer } = &mut *self; | |
// Try to get a value from one of the already initialized streams | |
let mut item = Self::poll_buffered(buffer, cx); | |
let mut done = false; | |
// Initialize new streams | |
let mut got_new_streams = false; // This could be an array of incicies so we don't need to check everything again | |
if !done { | |
for (i, slot) in buffer.iter_mut().enumerate() { | |
if slot.is_none() { | |
if let Poll::Ready(res) = Stream::poll_next(Pin::new(stream), cx) { | |
if res.is_some() { | |
*slot = res; | |
got_new_streams = true; | |
log!("\x1b[32mGot new stream #{}\x1b[m", "B", i); | |
} else { | |
done = true; | |
break; | |
} | |
} | |
} | |
} | |
} | |
// Attempt to get a value from the newly initialized streams | |
if item.is_none() && got_new_streams { | |
item = Self::poll_buffered(buffer, cx); | |
} | |
if done { | |
log!("\x1b[34msource stream is empty\x1b[m", "B"); | |
} | |
if item.is_some() { | |
Poll::Ready(item) | |
} else { | |
// We're only done if the source stream is done **and** we don't have any buffered streams left | |
if done && buffer.iter().all(|x| x.is_none()) { | |
log!("\x1b[33mdone\x1b[m", "B"); | |
Poll::Ready(None) | |
} else { | |
log!("\x1b[33mEvery Stream is Pending\x1b[m", "B"); | |
Poll::Pending | |
} | |
} | |
} | |
} | |
impl<St> Buffer<St> | |
where | |
St: Stream + Unpin, | |
St::Item: Stream + Unpin, | |
{ | |
fn poll_buffered( | |
buffer: &mut Box<[Option<St::Item>]>, | |
cx: &mut std::task::Context<'_>, | |
) -> Option<<St::Item as Stream>::Item> { | |
use std::task::Poll; | |
for (i, slot) in buffer.iter_mut().enumerate() { | |
if let Some(st) = slot { | |
log!("polling stream #{}", "B", i); | |
if let Poll::Ready(res) = Stream::poll_next(Pin::new(st), cx) { | |
if res.is_some() { | |
log!("\x1b[32mGot value from stream #{}\x1b[m", "B", i); | |
return res; | |
} else { | |
log!("\x1b[33mStream #{} finished\x1b[m", "B", i); | |
*slot = None; | |
} | |
} else { | |
log!("\x1b[30mStream #{} is Pending\x1b[m", "B", i); | |
} | |
} else { | |
log!("Slot #{} is empty", "B", i); | |
} | |
} | |
None | |
} | |
pub fn new(n: usize, stream: St) -> Self { | |
let mut buffer = Vec::with_capacity(n); | |
buffer.extend((0..n).map(|_| None)); | |
Self { | |
stream: stream.fuse(), | |
buffer: buffer.into_boxed_slice(), | |
} | |
} | |
} | |
#[tokio::main(core_threads = 20)] | |
async fn main() { | |
unsafe { | |
*START.as_mut_ptr() = std::time::Instant::now(); | |
} | |
let in_flight = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); | |
let mut s = Buffer::new( | |
5, | |
futures::stream::iter(1..=10).map(|i| { | |
log!("\x1b[36mCreated\x1b[0m", i); | |
let (mut tx, rx) = tokio::sync::mpsc::channel(1000); | |
let in_flight = in_flight.clone(); | |
tokio::spawn(async move { | |
log!("\x1b[31mSleeping\x1b[0m", i); | |
tokio::time::delay_for(Duration::from_secs(i % 5)).await; | |
log!("\x1b[32mStarting\x1b[0m", i); | |
for x in 0..5 { | |
in_flight.fetch_add(1, std::sync::atomic::Ordering::Relaxed); | |
tx.send(i + x).await.unwrap(); | |
log!("\x1b[33mSending\x1b[0m {}", i, i + x); | |
tokio::time::delay_for(Duration::from_secs(1)).await; | |
} | |
}); | |
rx | |
}), | |
); | |
while let Some(a) = s.next().await { | |
log!("\x1b[34mReceived\x1b[0m: {:?}", " ", a); | |
in_flight.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); | |
} | |
assert_eq!(in_flight.load(std::sync::atomic::Ordering::SeqCst), 0); | |
log!("Ok", " ") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::{stream::FusedStream, Stream, StreamExt}; | |
use std::pin::Pin; | |
use std::{ops::DerefMut, time::Duration}; | |
static mut START: std::mem::MaybeUninit<std::time::Instant> = std::mem::MaybeUninit::uninit(); | |
macro_rules! log { | |
($l: literal, $i: expr) => { | |
log!($l, $i,); | |
}; | |
($l: literal, $i: expr, $($args:expr),*) => { | |
if format!("{}", $i) != "B" { | |
println!(concat!("[{}] [{:.0?}]: ", $l), $i, unsafe {START.assume_init()}.elapsed(), $($args),*); | |
} | |
}; | |
} | |
/// Always pulls `n` additional elements from the `stream` (if available) | |
/// (only fills the buffer on the first poll) | |
struct Buffer<OuterStream> | |
where | |
OuterStream: DerefMut, | |
OuterStream::Target: Stream + FusedStream, | |
{ | |
stream: Pin<OuterStream>, | |
buffer: Box<[Option<<OuterStream::Target as Stream>::Item>]>, | |
} | |
impl<OuterStream, InnerStream, Item> Stream for Buffer<OuterStream> | |
where | |
Self: Unpin, | |
OuterStream: DerefMut, | |
OuterStream::Target: Stream<Item = Pin<InnerStream>> + FusedStream, | |
InnerStream: DerefMut, | |
InnerStream::Target: Stream<Item = Item>, | |
{ | |
type Item = Item; | |
fn poll_next( | |
self: std::pin::Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
) -> std::task::Poll<Option<Self::Item>> { | |
use std::task::Poll; | |
// Destructure ourselves to help the borrow checker | |
// We also Unpin ourselves here. | |
let Self { stream, buffer } = self.get_mut(); | |
// Try to get a value from one of the already initialized streams | |
let mut item = Self::poll_buffered(buffer, cx); | |
// Initialize new streams | |
let mut got_new_streams = false; // This could be an array of incicies so we don't need to check everything again | |
if !stream.is_terminated() { | |
for (i, slot) in buffer.iter_mut().enumerate() { | |
if slot.is_none() { | |
log!("Getting new stream", "B"); | |
if let Poll::Ready(res) = Stream::poll_next(stream.as_mut(), cx) { | |
if res.is_some() { | |
*slot = res; | |
got_new_streams = true; | |
log!("\x1b[32mGot new stream #{}\x1b[m", "B", i); | |
} else { | |
break; | |
} | |
} | |
} | |
} | |
} | |
// Attempt to get a value from the newly initialized streams | |
// Note that we can not claim to be `Poll::Pending` here instead | |
// as there might not be any wakers registered to wake us back up. | |
if item.is_none() && got_new_streams { | |
item = Self::poll_buffered(buffer, cx); | |
} | |
if stream.is_terminated() { | |
log!("\x1b[34msource stream is empty\x1b[m", "B"); | |
} | |
if item.is_some() { | |
Poll::Ready(item) | |
} else { | |
// We're only done if the source stream is empty **and** we don't have any buffered streams left | |
if stream.is_terminated() && buffer.iter().all(|x| x.is_none()) { | |
log!("\x1b[33mdone\x1b[m", "B"); | |
Poll::Ready(None) | |
} else { | |
log!("\x1b[33mEvery Stream is Pending\x1b[m", "B"); | |
Poll::Pending | |
} | |
} | |
} | |
} | |
impl<OuterStream, InnerStream, Item> FusedStream for Buffer<OuterStream> | |
where | |
Self: Unpin, | |
OuterStream: DerefMut, | |
OuterStream::Target: Stream<Item = Pin<InnerStream>> + FusedStream, | |
InnerStream: DerefMut, | |
InnerStream::Target: Stream<Item = Item>, | |
{ | |
fn is_terminated(&self) -> bool { | |
self.stream.is_terminated() && self.buffer.iter().all(|x| x.is_none()) | |
} | |
} | |
impl<OuterStream, InnerStream, Item> Buffer<OuterStream> | |
where | |
Self: Unpin, | |
OuterStream: DerefMut, | |
OuterStream::Target: Stream<Item = Pin<InnerStream>> + FusedStream, | |
InnerStream: DerefMut, | |
InnerStream::Target: Stream<Item = Item>, | |
{ | |
fn poll_buffered( | |
buffer: &mut Box<[Option<Pin<InnerStream>>]>, | |
cx: &mut std::task::Context<'_>, | |
) -> Option<<Self as Stream>::Item> { | |
use std::task::Poll; | |
for (i, slot) in buffer.iter_mut().enumerate() { | |
if let Some(st) = slot { | |
log!("polling stream #{}", "B", i); | |
if let Poll::Ready(res) = Stream::poll_next(st.as_mut(), cx) { | |
if res.is_some() { | |
log!("\x1b[32mGot value from stream #{}\x1b[m", "B", i); | |
return res; | |
} else { | |
log!("\x1b[33mStream #{} finished\x1b[m", "B", i); | |
*slot = None; | |
} | |
} else { | |
log!("\x1b[30mStream #{} is Pending\x1b[m", "B", i); | |
} | |
} else { | |
log!("Slot #{} is empty", "B", i); | |
} | |
} | |
None | |
} | |
pub fn new(n: usize, stream: Pin<OuterStream>) -> Self | |
where | |
OuterStream: futures::stream::FusedStream, | |
{ | |
let mut buffer = Vec::with_capacity(n); | |
buffer.extend((0..n).map(|_| None)); | |
Self { | |
stream, | |
buffer: buffer.into_boxed_slice(), | |
} | |
} | |
} | |
#[tokio::main(core_threads = 20)] | |
async fn main() { | |
unsafe { | |
*START.as_mut_ptr() = std::time::Instant::now(); | |
} | |
let in_flight = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); | |
let mut s = Buffer::new( | |
5, | |
Box::pin( | |
futures::stream::iter(1..=10) | |
.map(|i| { | |
log!("\x1b[36mCreated\x1b[0m", i); | |
let (mut tx, rx) = tokio::sync::mpsc::channel(1000); | |
let in_flight = in_flight.clone(); | |
tokio::spawn(async move { | |
log!("\x1b[31mSleeping\x1b[0m", i); | |
tokio::time::delay_for(Duration::from_secs(i % 5)).await; | |
log!("\x1b[32mStarting\x1b[0m", i); | |
for x in 0..5 { | |
in_flight.fetch_add(1, std::sync::atomic::Ordering::Relaxed); | |
tx.send(i + x).await.unwrap(); | |
log!("\x1b[33mSending\x1b[0m {}", i, i + x); | |
tokio::time::delay_for(Duration::from_secs(1)).await; | |
} | |
}); | |
Box::pin(rx) | |
}) | |
.fuse(), | |
), | |
); | |
while let Some(a) = s.next().await { | |
log!("\x1b[34mReceived\x1b[0m: {:?}", " ", a); | |
in_flight.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); | |
} | |
assert_eq!(in_flight.load(std::sync::atomic::Ordering::SeqCst), 0); | |
log!("Ok", " ") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment