Skip to content

Instantly share code, notes, and snippets.

@glittershark
Created August 9, 2023 16:11
Show Gist options
  • Save glittershark/f6c2fe55923749298bfb303c32f5330b to your computer and use it in GitHub Desktop.
Save glittershark/f6c2fe55923749298bfb303c32f5330b to your computer and use it in GitHub Desktop.
tokio broadcast perf
use std::thread;
use std::time::Instant;
use std::{sync::Arc, time::Duration};
use tokio::select;
use tokio::sync::broadcast;
use tokio::time::sleep;
const BUFFER_SIZE: usize = 64;
async fn recv_loop(tx: Arc<broadcast::Sender<usize>>) {
let mut rx = tx.subscribe();
loop {
select! {
_ = sleep(Duration::from_millis(100)) => {}
res = rx.recv() => {
if matches!(res, Err(broadcast::error::RecvError::Closed)) {
break;
}
}
}
}
}
fn run_test(num_subscribers: usize) -> Duration {
let (tx, _) = broadcast::channel(BUFFER_SIZE);
let tx = Arc::new(tx);
for _ in 0..num_subscribers {
let tx = tx.clone();
thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(1)
.build()
.unwrap();
runtime.block_on(recv_loop(tx));
});
}
let start = Instant::now();
let _ = tx.send(0);
start.elapsed()
}
fn main() {
for num_subscribers in 10..150 {
let dur = run_test(num_subscribers);
println!("{num_subscribers},{}", dur.as_nanos());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment