Created
December 17, 2023 22:35
-
-
Save the8472/b16843886c7e1f276ca89e06b39faaec to your computer and use it in GitHub Desktop.
async iter perf for small items
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(coroutines)] | |
#![feature(coroutine_trait)] | |
#![feature(unchecked_math)] | |
#![feature(decl_macro)] | |
#![feature(noop_waker)] | |
use std::future::Future; | |
use std::thread::yield_now; | |
use std::time::Duration; | |
use std::{task::{Poll, Context, Waker}, pin::{Pin, pin}}; | |
use criterion::{black_box, criterion_group, criterion_main, Criterion}; | |
use rand::{Rng, RngCore}; | |
trait AsyncIterator { | |
type Item; | |
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) | |
-> Poll<Option<Self::Item>>; | |
fn try_more(self: Pin<&mut Self>) -> Option<Self::Item>; | |
} | |
struct BufferedByteStream { | |
buf: Box<[u8; 4096]>, | |
remainin_chunks: usize, | |
idx: usize, | |
} | |
impl BufferedByteStream { | |
fn new() -> Self { | |
let mut b = BufferedByteStream { | |
remainin_chunks: 3, | |
idx: 0, | |
buf: Box::new([0; 4096]), | |
}; | |
b.reinit(); | |
b | |
} | |
fn reinit(&mut self) -> &mut Self { | |
let mut rng = rand::thread_rng(); | |
rng.fill_bytes(self.buf.as_mut_slice()); | |
self.rewind(); | |
self | |
} | |
fn rewind(&mut self) { | |
self.idx = 0; | |
self.remainin_chunks = 3; | |
} | |
fn refill(&mut self) { | |
let mut rng = rand::thread_rng(); | |
let val: u8 = rng.gen(); | |
let idx = rng.gen_range(0..self.buf.len()); | |
self.buf[idx] = val; | |
self.idx = 0; | |
self.remainin_chunks -= 1; | |
} | |
} | |
impl AsyncIterator for BufferedByteStream { | |
type Item = u8; | |
#[inline] | |
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
let idx = self.idx; | |
if self.buf.len() != idx { | |
unsafe {self.as_mut().idx = idx.unchecked_add(1)}; | |
return Poll::Ready(Some(unsafe { self.buf.as_ptr().add(idx).read() })); | |
} | |
if self.remainin_chunks > 0 { | |
// some "IO" | |
self.as_mut().refill(); | |
// pretend that we're waiting for the stuff that happens above | |
return Poll::Pending; | |
} | |
return Poll::Ready(None) | |
} | |
#[inline] | |
fn try_more(mut self: Pin<&mut Self>) -> Option<Self::Item> { | |
let idx = self.idx; | |
if self.buf.len() != idx { | |
unsafe {self.idx = idx.unchecked_add(1)}; | |
return Some(unsafe { self.buf.as_ptr().add(idx).read() }) | |
} | |
return None | |
} | |
} | |
impl Iterator for BufferedByteStream { | |
type Item = u8; | |
#[inline] | |
fn next(&mut self) -> Option<Self::Item> { | |
let idx = self.idx; | |
if self.buf.len() != idx { | |
unsafe {self.idx = idx.unchecked_add(1)}; | |
return Some(unsafe { self.buf.as_ptr().add(idx).read() }) | |
} | |
if self.remainin_chunks > 0 { | |
self.refill(); | |
self.idx = 1; | |
return Some(self.buf[0]) | |
} | |
return None; | |
} | |
#[inline] | |
fn fold<B, F>(mut self, init: B, mut f: F) -> B | |
where | |
Self: Sized, | |
F: FnMut(B, Self::Item) -> B, { | |
let mut acc = init; | |
loop { | |
acc = self.buf.iter().copied().fold(acc, &mut f); | |
if self.remainin_chunks > 0 { | |
// some "IO" | |
self.refill(); | |
// parity to the async version | |
yield_now(); | |
} else { | |
break; | |
} | |
} | |
acc | |
} | |
} | |
async fn sum_simple(stream: impl AsyncIterator<Item=u8>) -> usize { | |
let mut stream = pin!(stream); | |
let mut sum: usize = 0; | |
desugar_loop!(for simple i in stream => { sum = sum.wrapping_add(i as usize); }); | |
sum | |
} | |
async fn sum_more(stream: impl AsyncIterator<Item=u8>) -> usize { | |
let mut stream = pin!(stream); | |
let mut sum: usize = 0; | |
desugar_loop!(for more i in stream.as_mut() => { sum = sum.wrapping_add(i as usize); }); | |
sum | |
} | |
fn sum_iter(it: impl Iterator<Item=u8>) -> usize { | |
it.map(|i| i as usize).sum() | |
} | |
macro desugar_loop { | |
(for simple $i:ident in $it: expr => $body:block) => { | |
use std::future::poll_fn; | |
while let Some($i) = poll_fn(|cx| $it.as_mut().poll_next(cx)).await { | |
$body | |
} | |
}, | |
(for more $i:ident in $it: expr => $body:block) => { | |
use std::future::poll_fn; | |
loop { | |
if let Some($i) = $it.try_more() { | |
$body | |
continue; | |
} | |
if let Some($i) = poll_fn(|cx| $it.poll_next(cx)).await { | |
$body | |
} else { | |
break | |
} | |
} | |
} | |
} | |
pub fn criterion_benchmark(c: &mut Criterion) { | |
c.bench_function("iter", |b| { | |
let mut sum = 0; | |
b.iter_batched(|| BufferedByteStream::new(), |it| { | |
let it = black_box( it); | |
sum = sum_iter(it); | |
sum | |
}, criterion::BatchSize::SmallInput); | |
}); | |
c.bench_function("iter by ref", |b| { | |
let mut sum = 0; | |
b.iter_batched_ref(|| BufferedByteStream::new(), |mut it| { | |
let mut it = &mut it; | |
let it = black_box( it.by_ref()); | |
sum = sum_iter(it); | |
sum | |
}, criterion::BatchSize::SmallInput); | |
}); | |
let waker = Waker::noop(); | |
c.bench_function("simple", |b| { | |
let mut sum = 0; | |
let mut ctx = Context::from_waker(&waker); | |
b.iter_batched(|| BufferedByteStream::new() , |it| { | |
let mut fut = pin!(sum_simple(black_box(it))); | |
sum = loop { | |
match fut.as_mut().poll(&mut ctx) { | |
Poll::Pending => { std::thread::yield_now(); } | |
Poll::Ready(val) => break val | |
} | |
}; | |
sum | |
}, criterion::BatchSize::SmallInput); | |
}); | |
c.bench_function("more", |b| { | |
let mut sum = 0; | |
let mut ctx = Context::from_waker(&waker); | |
b.iter_batched(|| BufferedByteStream::new(), |it| { | |
let mut fut = pin!(sum_more(black_box(it))); | |
sum = loop { | |
match fut.as_mut().poll(&mut ctx) { | |
Poll::Pending => { std::thread::yield_now(); } | |
Poll::Ready(val) => break val | |
} | |
}; | |
sum | |
}, criterion::BatchSize::SmallInput); | |
}); | |
} | |
criterion_group!{ | |
name = benches; | |
config = Criterion::default().warm_up_time(Duration::from_millis(500)).measurement_time(Duration::from_secs(2)); | |
targets = criterion_benchmark | |
} | |
criterion_main!(benches); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
iter time: [2.5774 µs 2.5935 µs 2.6124 µs] | |
change: [-48.645% -48.216% -47.743%] (p = 0.00 < 0.05) | |
Performance has improved. | |
Found 23 outliers among 100 measurements (23.00%) | |
16 (16.00%) high mild | |
7 (7.00%) high severe | |
iter by ref time: [7.2946 µs 7.3255 µs 7.3747 µs] | |
change: [-1.0041% -0.7966% -0.5203%] (p = 0.00 < 0.05) | |
Change within noise threshold. | |
Found 10 outliers among 100 measurements (10.00%) | |
3 (3.00%) low mild | |
3 (3.00%) high mild | |
4 (4.00%) high severe | |
simple time: [33.895 µs 33.920 µs 33.946 µs] | |
change: [-8.2769% -8.1407% -8.0074%] (p = 0.00 < 0.05) | |
Performance has improved. | |
Found 8 outliers among 100 measurements (8.00%) | |
2 (2.00%) low mild | |
6 (6.00%) high mild | |
more time: [33.129 µs 33.175 µs 33.219 µs] | |
change: [-7.8016% -7.4531% -7.1860%] (p = 0.00 < 0.05) | |
Performance has improved. | |
Found 2 outliers among 100 measurements (2.00%) | |
2 (2.00%) high mild | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment