Skip to content

Instantly share code, notes, and snippets.

@soruh
Last active January 14, 2021 21:16
Show Gist options
  • Save soruh/ff8f9dfa4f238fe6ed604647abd9100b to your computer and use it in GitHub Desktop.
Save soruh/ff8f9dfa4f238fe6ed604647abd9100b to your computer and use it in GitHub Desktop.
Buffered Stream
use futures::{Stream, StreamExt};
use std::time::Duration;
use std::{collections::VecDeque, pin::Pin};
// Why do the spawned tasks that generate the streams in the code below
// not run in parallel with at most 2 tasks running at once?
static mut START: std::mem::MaybeUninit<std::time::Instant> = std::mem::MaybeUninit::uninit();
macro_rules! log {
($l: literal, $i: expr) => {
log!($l, $i,);
};
($l: literal, $i: expr, $($args:expr),*) => {
println!(concat!("[{}] [{:.0?}]: ", $l), $i, unsafe {START.assume_init()}.elapsed(), $($args),*);
};
}
/// Always pulls `n` additional elements from the `stream` (if available)
/// (only fills the buffer on the first poll)
struct Buffer<St: Stream> {
n: usize,
stream: St,
buffer: VecDeque<St::Item>,
done: bool,
}
impl<St: Stream + Unpin> Stream for Buffer<St>
where
Self: Unpin,
{
type Item = St::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::task::Poll;
let mut item = None;
if !self.done {
while self.buffer.len() < self.n {
if let Poll::Ready(next) = Stream::poll_next(Pin::new(&mut self.stream), cx) {
if let Some(next) = next {
self.buffer.push_back(next);
} else {
self.done = true;
break;
}
} else {
break;
}
// If we don't already have an item to yield pop one here
// and continue filling the buffer
if item.is_none() {
item = self.buffer.pop_front();
}
}
}
// If we already have an item take that, otherwise try to pop one from the buffer
let item = item.or_else(|| self.buffer.pop_front());
if self.done {
// We don't have any more items in the source, if the buffer is empty we're done
Poll::Ready(item)
} else {
// The source is not yet exhaused, so we're still `Pending` if the buffer is empty
item.map(|item| Poll::Ready(Some(item)))
.unwrap_or(Poll::Pending)
}
}
}
impl<St: Stream> Buffer<St> {
pub fn new(n: usize, stream: St) -> Self {
Self {
n,
stream,
buffer: Default::default(),
done: false,
}
}
}
#[tokio::main(core_threads = 20)]
async fn main() {
unsafe {
*START.as_mut_ptr() = std::time::Instant::now();
}
let mut s = Buffer::new(
2,
futures::stream::iter(1..=10).map(|i| {
log!("\x1b[36mCreated\x1b[0m", i);
let (mut tx, rx) = tokio::sync::mpsc::channel(1000);
tokio::spawn(async move {
log!("\x1b[31mSleeping\x1b[0m", i);
tokio::time::delay_for(Duration::from_secs(1)).await;
log!("\x1b[32mStarting\x1b[0m", i);
for x in 0usize..3 {
log!("\x1b[33mSending\x1b[0m {}", i, i + x);
tx.send(i + x).await.unwrap();
tokio::time::delay_for(Duration::from_secs(1)).await;
}
});
rx
}),
);
while let Some(mut a) = s.next().await {
while let Some(b) = a.recv().await {
log!("\x1b[34mReceived\x1b[0m: {:?}", " ", b);
}
std::mem::drop(a);
}
}
use futures::{stream::Fuse, Stream, StreamExt};
use std::pin::Pin;
use std::time::Duration;
static mut START: std::mem::MaybeUninit<std::time::Instant> = std::mem::MaybeUninit::uninit();
macro_rules! log {
($l: literal, $i: expr) => {
log!($l, $i,);
};
($l: literal, $i: expr, $($args:expr),*) => {
if format!("{}", $i) != "B" {
println!(concat!("[{}] [{:.0?}]: ", $l), $i, unsafe {START.assume_init()}.elapsed(), $($args),*);
}
};
}
/// Always pulls `n` additional elements from the `stream` (if available)
/// (only fills the buffer on the first poll)
struct Buffer<St>
where
St: Stream,
{
stream: Fuse<St>,
buffer: Box<[Option<St::Item>]>,
}
impl<St> Stream for Buffer<St>
where
St: Stream + Unpin,
St::Item: Stream + Unpin,
{
type Item = <St::Item as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::task::Poll;
let Self { stream, buffer } = &mut *self;
// Try to get a value from one of the already initialized streams
let mut item = Self::poll_buffered(buffer, cx);
let mut done = false;
// Initialize new streams
let mut got_new_streams = false; // This could be an array of incicies so we don't need to check everything again
if !done {
for (i, slot) in buffer.iter_mut().enumerate() {
if slot.is_none() {
if let Poll::Ready(res) = Stream::poll_next(Pin::new(stream), cx) {
if res.is_some() {
*slot = res;
got_new_streams = true;
log!("\x1b[32mGot new stream #{}\x1b[m", "B", i);
} else {
done = true;
break;
}
}
}
}
}
// Attempt to get a value from the newly initialized streams
if item.is_none() && got_new_streams {
item = Self::poll_buffered(buffer, cx);
}
if done {
log!("\x1b[34msource stream is empty\x1b[m", "B");
}
if item.is_some() {
Poll::Ready(item)
} else {
// We're only done if the source stream is done **and** we don't have any buffered streams left
if done && buffer.iter().all(|x| x.is_none()) {
log!("\x1b[33mdone\x1b[m", "B");
Poll::Ready(None)
} else {
log!("\x1b[33mEvery Stream is Pending\x1b[m", "B");
Poll::Pending
}
}
}
}
impl<St> Buffer<St>
where
St: Stream + Unpin,
St::Item: Stream + Unpin,
{
fn poll_buffered(
buffer: &mut Box<[Option<St::Item>]>,
cx: &mut std::task::Context<'_>,
) -> Option<<St::Item as Stream>::Item> {
use std::task::Poll;
for (i, slot) in buffer.iter_mut().enumerate() {
if let Some(st) = slot {
log!("polling stream #{}", "B", i);
if let Poll::Ready(res) = Stream::poll_next(Pin::new(st), cx) {
if res.is_some() {
log!("\x1b[32mGot value from stream #{}\x1b[m", "B", i);
return res;
} else {
log!("\x1b[33mStream #{} finished\x1b[m", "B", i);
*slot = None;
}
} else {
log!("\x1b[30mStream #{} is Pending\x1b[m", "B", i);
}
} else {
log!("Slot #{} is empty", "B", i);
}
}
None
}
pub fn new(n: usize, stream: St) -> Self {
let mut buffer = Vec::with_capacity(n);
buffer.extend((0..n).map(|_| None));
Self {
stream: stream.fuse(),
buffer: buffer.into_boxed_slice(),
}
}
}
#[tokio::main(core_threads = 20)]
async fn main() {
unsafe {
*START.as_mut_ptr() = std::time::Instant::now();
}
let in_flight = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let mut s = Buffer::new(
5,
futures::stream::iter(1..=10).map(|i| {
log!("\x1b[36mCreated\x1b[0m", i);
let (mut tx, rx) = tokio::sync::mpsc::channel(1000);
let in_flight = in_flight.clone();
tokio::spawn(async move {
log!("\x1b[31mSleeping\x1b[0m", i);
tokio::time::delay_for(Duration::from_secs(i % 5)).await;
log!("\x1b[32mStarting\x1b[0m", i);
for x in 0..5 {
in_flight.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tx.send(i + x).await.unwrap();
log!("\x1b[33mSending\x1b[0m {}", i, i + x);
tokio::time::delay_for(Duration::from_secs(1)).await;
}
});
rx
}),
);
while let Some(a) = s.next().await {
log!("\x1b[34mReceived\x1b[0m: {:?}", " ", a);
in_flight.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
assert_eq!(in_flight.load(std::sync::atomic::Ordering::SeqCst), 0);
log!("Ok", " ")
}
use futures::{stream::FusedStream, Stream, StreamExt};
use std::pin::Pin;
use std::{ops::DerefMut, time::Duration};
static mut START: std::mem::MaybeUninit<std::time::Instant> = std::mem::MaybeUninit::uninit();
macro_rules! log {
($l: literal, $i: expr) => {
log!($l, $i,);
};
($l: literal, $i: expr, $($args:expr),*) => {
if format!("{}", $i) != "B" {
println!(concat!("[{}] [{:.0?}]: ", $l), $i, unsafe {START.assume_init()}.elapsed(), $($args),*);
}
};
}
/// Always pulls `n` additional elements from the `stream` (if available)
/// (only fills the buffer on the first poll)
struct Buffer<OuterStream>
where
OuterStream: DerefMut,
OuterStream::Target: Stream + FusedStream,
{
stream: Pin<OuterStream>,
buffer: Box<[Option<<OuterStream::Target as Stream>::Item>]>,
}
impl<OuterStream, InnerStream, Item> Stream for Buffer<OuterStream>
where
Self: Unpin,
OuterStream: DerefMut,
OuterStream::Target: Stream<Item = Pin<InnerStream>> + FusedStream,
InnerStream: DerefMut,
InnerStream::Target: Stream<Item = Item>,
{
type Item = Item;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::task::Poll;
// Destructure ourselves to help the borrow checker
// We also Unpin ourselves here.
let Self { stream, buffer } = self.get_mut();
// Try to get a value from one of the already initialized streams
let mut item = Self::poll_buffered(buffer, cx);
// Initialize new streams
let mut got_new_streams = false; // This could be an array of incicies so we don't need to check everything again
if !stream.is_terminated() {
for (i, slot) in buffer.iter_mut().enumerate() {
if slot.is_none() {
log!("Getting new stream", "B");
if let Poll::Ready(res) = Stream::poll_next(stream.as_mut(), cx) {
if res.is_some() {
*slot = res;
got_new_streams = true;
log!("\x1b[32mGot new stream #{}\x1b[m", "B", i);
} else {
break;
}
}
}
}
}
// Attempt to get a value from the newly initialized streams
// Note that we can not claim to be `Poll::Pending` here instead
// as there might not be any wakers registered to wake us back up.
if item.is_none() && got_new_streams {
item = Self::poll_buffered(buffer, cx);
}
if stream.is_terminated() {
log!("\x1b[34msource stream is empty\x1b[m", "B");
}
if item.is_some() {
Poll::Ready(item)
} else {
// We're only done if the source stream is empty **and** we don't have any buffered streams left
if stream.is_terminated() && buffer.iter().all(|x| x.is_none()) {
log!("\x1b[33mdone\x1b[m", "B");
Poll::Ready(None)
} else {
log!("\x1b[33mEvery Stream is Pending\x1b[m", "B");
Poll::Pending
}
}
}
}
impl<OuterStream, InnerStream, Item> FusedStream for Buffer<OuterStream>
where
Self: Unpin,
OuterStream: DerefMut,
OuterStream::Target: Stream<Item = Pin<InnerStream>> + FusedStream,
InnerStream: DerefMut,
InnerStream::Target: Stream<Item = Item>,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.buffer.iter().all(|x| x.is_none())
}
}
impl<OuterStream, InnerStream, Item> Buffer<OuterStream>
where
Self: Unpin,
OuterStream: DerefMut,
OuterStream::Target: Stream<Item = Pin<InnerStream>> + FusedStream,
InnerStream: DerefMut,
InnerStream::Target: Stream<Item = Item>,
{
fn poll_buffered(
buffer: &mut Box<[Option<Pin<InnerStream>>]>,
cx: &mut std::task::Context<'_>,
) -> Option<<Self as Stream>::Item> {
use std::task::Poll;
for (i, slot) in buffer.iter_mut().enumerate() {
if let Some(st) = slot {
log!("polling stream #{}", "B", i);
if let Poll::Ready(res) = Stream::poll_next(st.as_mut(), cx) {
if res.is_some() {
log!("\x1b[32mGot value from stream #{}\x1b[m", "B", i);
return res;
} else {
log!("\x1b[33mStream #{} finished\x1b[m", "B", i);
*slot = None;
}
} else {
log!("\x1b[30mStream #{} is Pending\x1b[m", "B", i);
}
} else {
log!("Slot #{} is empty", "B", i);
}
}
None
}
pub fn new(n: usize, stream: Pin<OuterStream>) -> Self
where
OuterStream: futures::stream::FusedStream,
{
let mut buffer = Vec::with_capacity(n);
buffer.extend((0..n).map(|_| None));
Self {
stream,
buffer: buffer.into_boxed_slice(),
}
}
}
#[tokio::main(core_threads = 20)]
async fn main() {
unsafe {
*START.as_mut_ptr() = std::time::Instant::now();
}
let in_flight = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let mut s = Buffer::new(
5,
Box::pin(
futures::stream::iter(1..=10)
.map(|i| {
log!("\x1b[36mCreated\x1b[0m", i);
let (mut tx, rx) = tokio::sync::mpsc::channel(1000);
let in_flight = in_flight.clone();
tokio::spawn(async move {
log!("\x1b[31mSleeping\x1b[0m", i);
tokio::time::delay_for(Duration::from_secs(i % 5)).await;
log!("\x1b[32mStarting\x1b[0m", i);
for x in 0..5 {
in_flight.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tx.send(i + x).await.unwrap();
log!("\x1b[33mSending\x1b[0m {}", i, i + x);
tokio::time::delay_for(Duration::from_secs(1)).await;
}
});
Box::pin(rx)
})
.fuse(),
),
);
while let Some(a) = s.next().await {
log!("\x1b[34mReceived\x1b[0m: {:?}", " ", a);
in_flight.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
assert_eq!(in_flight.load(std::sync::atomic::Ordering::SeqCst), 0);
log!("Ok", " ")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment