System: Ryzen 7 5900X, 64GB ram, linux 5.15, Nats server 2.9.10 in docker All results tested in release mode, rustc 1.66.0
total time spent (micro secs): 176134
avg (micro secs): 1761
total time spent (micro secs): 176074
avg (micro secs): 1760
total time spent (micro secs): 32676
avg (micro secs): 326
total time spent (micro secs): 32139
avg (micro secs): 321
Flush code:
client.publish("latency-testing", &i.to_string()).unwrap();
client.flush().unwrap();
subscriber.next().unwrap();
Source code originnally from nats-io/nats.rs#573
use std::time::{Duration, Instant};
use futures_util::StreamExt;
#[tokio::main]
async fn main(){
sync_main();
async_main().await;
}
fn sync_main() {
let client = nats::connect("localhost:4222").unwrap();
let subscriber = client.subscribe("latency-testing").unwrap();
let mut publish_sum = 0u128;
let mut flush_sum = 0u128;
let mut fetch_sum = 0u128;
for i in 0..100 {
let published_at = Instant::now();
client.publish("latency-testing", &i.to_string()).unwrap();
publish_sum += published_at.elapsed().as_micros();
let flush_at = Instant::now();
client.flush().unwrap();
flush_sum += flush_at.elapsed().as_micros();
let fetch_at = Instant::now();
subscriber.next().unwrap();
fetch_sum += fetch_at.elapsed().as_micros();
std::thread::sleep(Duration::from_millis(200));
}
println!("sync publish (μs): {}", publish_sum);
println!("sync flush (μs): {}", flush_sum);
println!("sync fetch (μs): {}", fetch_sum);
let sum = publish_sum + flush_sum + fetch_sum;
println!("sync total time spent (μs): {}", sum);
println!("sync avg (μs): {}", sum / 100);
}
async fn async_main() {
let client = async_nats::connect("localhost:4222").await.unwrap();
let mut subscriber = client.subscribe("latency-testing".to_string()).await.unwrap();
let mut publish_sum = 0u128;
let mut flush_sum = 0u128;
let mut fetch_sum = 0u128;
for i in 0..100 {
let published_at = Instant::now();
client.publish("latency-testing".to_string(), i.to_string().into()).await.unwrap();
publish_sum += published_at.elapsed().as_micros();
let flush_at = Instant::now();
client.flush().await.unwrap();
flush_sum += flush_at.elapsed().as_micros();
let fetch_at = Instant::now();
subscriber.next().await.unwrap();
fetch_sum += fetch_at.elapsed().as_micros();
tokio::time::sleep(Duration::from_millis(200)).await;
}
println!("async publish (μs): {}", publish_sum);
println!("async flush (μs): {}", flush_sum);
println!("async fetch (μs): {}", fetch_sum);
let sum = publish_sum + flush_sum + fetch_sum;
println!("async total time spent (μs): {}", sum);
println!("async avg (μs): {}", sum / 100);
}