Skip to content

Instantly share code, notes, and snippets.

@esemeniuc
Last active January 23, 2023 19:30
Show Gist options
  • Save esemeniuc/caf0bba5f87cc90c2e5a10bb9e554b11 to your computer and use it in GitHub Desktop.
Save esemeniuc/caf0bba5f87cc90c2e5a10bb9e554b11 to your computer and use it in GitHub Desktop.
Async vs sync nats

System: Ryzen 7 5900X, 64GB ram, linux 5.15, Nats server 2.9.10 in docker All results tested in release mode, rustc 1.66.0

Async

async-nats 0.26, tokio 1.14

total time spent (micro secs): 176134
avg (micro secs): 1761

async-nats 0.26, tokio 1.24.1

total time spent (micro secs): 176074
avg (micro secs): 1760

Sync

nats 0.23.1 (non flush)

total time spent (micro secs): 32676
avg (micro secs): 326

nats 0.23.1 (flush)

total time spent (micro secs): 32139
avg (micro secs): 321

Flush code:

        client.publish("latency-testing", &i.to_string()).unwrap();
        client.flush().unwrap();
        subscriber.next().unwrap();

Source code originnally from nats-io/nats.rs#573

Source code:

use std::time::{Duration, Instant};

use futures_util::StreamExt;

#[tokio::main]
async fn main(){
    sync_main();
    async_main().await;
}

fn sync_main() {
    let client = nats::connect("localhost:4222").unwrap();
    let subscriber = client.subscribe("latency-testing").unwrap();
    let mut publish_sum = 0u128;
    let mut flush_sum = 0u128;
    let mut fetch_sum = 0u128;
    for i in 0..100 {
        let published_at = Instant::now();
        client.publish("latency-testing", &i.to_string()).unwrap();
        publish_sum += published_at.elapsed().as_micros();

        let flush_at = Instant::now();
        client.flush().unwrap();
        flush_sum += flush_at.elapsed().as_micros();

        let fetch_at = Instant::now();
        subscriber.next().unwrap();
        fetch_sum += fetch_at.elapsed().as_micros();

        std::thread::sleep(Duration::from_millis(200));
    }

    println!("sync publish (μs): {}", publish_sum);
    println!("sync flush (μs): {}", flush_sum);
    println!("sync fetch (μs): {}", fetch_sum);
    let sum = publish_sum + flush_sum + fetch_sum;
    println!("sync total time spent (μs): {}", sum);
    println!("sync avg (μs): {}", sum / 100);
}

async fn async_main() {
    let client = async_nats::connect("localhost:4222").await.unwrap();
    let mut subscriber = client.subscribe("latency-testing".to_string()).await.unwrap();
    let mut publish_sum = 0u128;
    let mut flush_sum = 0u128;
    let mut fetch_sum = 0u128;
    for i in 0..100 {
        let published_at = Instant::now();
        client.publish("latency-testing".to_string(), i.to_string().into()).await.unwrap();
        publish_sum += published_at.elapsed().as_micros();

        let flush_at = Instant::now();
        client.flush().await.unwrap();
        flush_sum += flush_at.elapsed().as_micros();

        let fetch_at = Instant::now();
        subscriber.next().await.unwrap();
        fetch_sum += fetch_at.elapsed().as_micros();

        tokio::time::sleep(Duration::from_millis(200)).await;
    }

    println!("async publish (μs): {}", publish_sum);
    println!("async flush (μs): {}", flush_sum);
    println!("async fetch (μs): {}", fetch_sum);
    let sum = publish_sum + flush_sum + fetch_sum;
    println!("async total time spent (μs): {}", sum);
    println!("async avg (μs): {}", sum / 100);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment