Skip to content

Instantly share code, notes, and snippets.

@the8472
Created December 17, 2023 22:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save the8472/b16843886c7e1f276ca89e06b39faaec to your computer and use it in GitHub Desktop.
Save the8472/b16843886c7e1f276ca89e06b39faaec to your computer and use it in GitHub Desktop.
async iter perf for small items
#![feature(coroutines)]
#![feature(coroutine_trait)]
#![feature(unchecked_math)]
#![feature(decl_macro)]
#![feature(noop_waker)]
use std::future::Future;
use std::thread::yield_now;
use std::time::Duration;
use std::{task::{Poll, Context, Waker}, pin::{Pin, pin}};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use rand::{Rng, RngCore};
trait AsyncIterator {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
fn try_more(self: Pin<&mut Self>) -> Option<Self::Item>;
}
struct BufferedByteStream {
buf: Box<[u8; 4096]>,
remainin_chunks: usize,
idx: usize,
}
impl BufferedByteStream {
fn new() -> Self {
let mut b = BufferedByteStream {
remainin_chunks: 3,
idx: 0,
buf: Box::new([0; 4096]),
};
b.reinit();
b
}
fn reinit(&mut self) -> &mut Self {
let mut rng = rand::thread_rng();
rng.fill_bytes(self.buf.as_mut_slice());
self.rewind();
self
}
fn rewind(&mut self) {
self.idx = 0;
self.remainin_chunks = 3;
}
fn refill(&mut self) {
let mut rng = rand::thread_rng();
let val: u8 = rng.gen();
let idx = rng.gen_range(0..self.buf.len());
self.buf[idx] = val;
self.idx = 0;
self.remainin_chunks -= 1;
}
}
impl AsyncIterator for BufferedByteStream {
type Item = u8;
#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let idx = self.idx;
if self.buf.len() != idx {
unsafe {self.as_mut().idx = idx.unchecked_add(1)};
return Poll::Ready(Some(unsafe { self.buf.as_ptr().add(idx).read() }));
}
if self.remainin_chunks > 0 {
// some "IO"
self.as_mut().refill();
// pretend that we're waiting for the stuff that happens above
return Poll::Pending;
}
return Poll::Ready(None)
}
#[inline]
fn try_more(mut self: Pin<&mut Self>) -> Option<Self::Item> {
let idx = self.idx;
if self.buf.len() != idx {
unsafe {self.idx = idx.unchecked_add(1)};
return Some(unsafe { self.buf.as_ptr().add(idx).read() })
}
return None
}
}
impl Iterator for BufferedByteStream {
type Item = u8;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
let idx = self.idx;
if self.buf.len() != idx {
unsafe {self.idx = idx.unchecked_add(1)};
return Some(unsafe { self.buf.as_ptr().add(idx).read() })
}
if self.remainin_chunks > 0 {
self.refill();
self.idx = 1;
return Some(self.buf[0])
}
return None;
}
#[inline]
fn fold<B, F>(mut self, init: B, mut f: F) -> B
where
Self: Sized,
F: FnMut(B, Self::Item) -> B, {
let mut acc = init;
loop {
acc = self.buf.iter().copied().fold(acc, &mut f);
if self.remainin_chunks > 0 {
// some "IO"
self.refill();
// parity to the async version
yield_now();
} else {
break;
}
}
acc
}
}
async fn sum_simple(stream: impl AsyncIterator<Item=u8>) -> usize {
let mut stream = pin!(stream);
let mut sum: usize = 0;
desugar_loop!(for simple i in stream => { sum = sum.wrapping_add(i as usize); });
sum
}
async fn sum_more(stream: impl AsyncIterator<Item=u8>) -> usize {
let mut stream = pin!(stream);
let mut sum: usize = 0;
desugar_loop!(for more i in stream.as_mut() => { sum = sum.wrapping_add(i as usize); });
sum
}
fn sum_iter(it: impl Iterator<Item=u8>) -> usize {
it.map(|i| i as usize).sum()
}
macro desugar_loop {
(for simple $i:ident in $it: expr => $body:block) => {
use std::future::poll_fn;
while let Some($i) = poll_fn(|cx| $it.as_mut().poll_next(cx)).await {
$body
}
},
(for more $i:ident in $it: expr => $body:block) => {
use std::future::poll_fn;
loop {
if let Some($i) = $it.try_more() {
$body
continue;
}
if let Some($i) = poll_fn(|cx| $it.poll_next(cx)).await {
$body
} else {
break
}
}
}
}
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("iter", |b| {
let mut sum = 0;
b.iter_batched(|| BufferedByteStream::new(), |it| {
let it = black_box( it);
sum = sum_iter(it);
sum
}, criterion::BatchSize::SmallInput);
});
c.bench_function("iter by ref", |b| {
let mut sum = 0;
b.iter_batched_ref(|| BufferedByteStream::new(), |mut it| {
let mut it = &mut it;
let it = black_box( it.by_ref());
sum = sum_iter(it);
sum
}, criterion::BatchSize::SmallInput);
});
let waker = Waker::noop();
c.bench_function("simple", |b| {
let mut sum = 0;
let mut ctx = Context::from_waker(&waker);
b.iter_batched(|| BufferedByteStream::new() , |it| {
let mut fut = pin!(sum_simple(black_box(it)));
sum = loop {
match fut.as_mut().poll(&mut ctx) {
Poll::Pending => { std::thread::yield_now(); }
Poll::Ready(val) => break val
}
};
sum
}, criterion::BatchSize::SmallInput);
});
c.bench_function("more", |b| {
let mut sum = 0;
let mut ctx = Context::from_waker(&waker);
b.iter_batched(|| BufferedByteStream::new(), |it| {
let mut fut = pin!(sum_more(black_box(it)));
sum = loop {
match fut.as_mut().poll(&mut ctx) {
Poll::Pending => { std::thread::yield_now(); }
Poll::Ready(val) => break val
}
};
sum
}, criterion::BatchSize::SmallInput);
});
}
criterion_group!{
name = benches;
config = Criterion::default().warm_up_time(Duration::from_millis(500)).measurement_time(Duration::from_secs(2));
targets = criterion_benchmark
}
criterion_main!(benches);
iter time: [2.5774 µs 2.5935 µs 2.6124 µs]
change: [-48.645% -48.216% -47.743%] (p = 0.00 < 0.05)
Performance has improved.
Found 23 outliers among 100 measurements (23.00%)
16 (16.00%) high mild
7 (7.00%) high severe
iter by ref time: [7.2946 µs 7.3255 µs 7.3747 µs]
change: [-1.0041% -0.7966% -0.5203%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 10 outliers among 100 measurements (10.00%)
3 (3.00%) low mild
3 (3.00%) high mild
4 (4.00%) high severe
simple time: [33.895 µs 33.920 µs 33.946 µs]
change: [-8.2769% -8.1407% -8.0074%] (p = 0.00 < 0.05)
Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
2 (2.00%) low mild
6 (6.00%) high mild
more time: [33.129 µs 33.175 µs 33.219 µs]
change: [-7.8016% -7.4531% -7.1860%] (p = 0.00 < 0.05)
Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
2 (2.00%) high mild
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment