Skip to content

Instantly share code, notes, and snippets.

@musicq
Forked from lu4nm3/main.rs
Created December 8, 2023 04:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save musicq/26232e114672e6c87609b06ce0c5d8d2 to your computer and use it in GitHub Desktop.
Save musicq/26232e114672e6c87609b06ce0c5d8d2 to your computer and use it in GitHub Desktop.
Tokio Async: Concurrent vs Parallel
use futures::StreamExt;
use std::error::Error;
use tokio;
use tokio::macros::support::Pin;
use tokio::prelude::*;
use tokio::time::{Duration, Instant};
pub fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut multi_threaded_runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.core_threads(10)
.max_threads(10)
.thread_name("multi-threaded")
.build()?;
// multi_threaded_runtime.block_on(concurrent());
multi_threaded_runtime.block_on(concurrentAndParallel());
Ok(())
}
// As can be seen by the output below, despite specifying parallelism of 3, we are still bound by
// the fact that all of the future generated by this function execute within a single task.
async fn concurrent() {
let before = Instant::now();
let paths = (0..6).rev();
let fetches = futures::stream::iter(paths.into_iter().map(|path| make_request(path)))
.buffer_unordered(3)
.map(|r| println!("finished request: {}", r))
.collect::<Vec<_>>();
fetches.await;
println!("elapsed time: {:.2?}", before.elapsed());
}
// started request
// finished request: current thread ThreadId(1) | thread name main | request_duration 5
// started request
// finished request: current thread ThreadId(1) | thread name main | request_duration 4
// started request
// finished request: current thread ThreadId(1) | thread name main | request_duration 3
// started request
// finished request: current thread ThreadId(1) | thread name main | request_duration 2
// started request
// finished request: current thread ThreadId(1) | thread name main | request_duration 1
// started request
// finished request: current thread ThreadId(1) | thread name main | request_duration 0
// elapsed time: 15.01s
// Here, we wrap every future within its own task using tokio::spawn. This allows the "requests" to
// execute in parallel (depending on how many threads the runtime is configured with; 10 in this
// case) using the multiplexing that Tokio does between different tasks and threads. You can see
// from the output how 3 threads with ids, 9, 10, and 11, are consistently used to execute all of
// the 6 "requests".
async fn concurrentAndParallel() {
let before = Instant::now();
let paths = (0..6).rev();
let fetches = futures::stream::iter(
paths
.into_iter()
.map(|path| tokio::spawn(make_request(path))),
)
.buffer_unordered(3)
.map(|r| {
println!(
"finished request: {}",
match r {
Ok(rr) => rr,
Err(_) => String::from("Bad"),
}
);
})
.collect::<Vec<_>>();
fetches.await;
println!("elapsed time: {:.2?}", before.elapsed());
}
// started request
// started request
// started request
// finished request: current thread ThreadId(9) | thread name multi-threaded | request_duration 3
// started request
// finished request: current thread ThreadId(10) | thread name multi-threaded | request_duration 4
// started request
// finished request: current thread ThreadId(11) | thread name multi-threaded | request_duration 5
// started request
// finished request: current thread ThreadId(11) | thread name multi-threaded | request_duration 0
// finished request: current thread ThreadId(10) | thread name multi-threaded | request_duration 1
// finished request: current thread ThreadId(9) | thread name multi-threaded | request_duration 2
// elapsed time: 5.01s
async fn make_request(sleep: u64) -> String {
println!("started request");
std::thread::sleep(Duration::from_secs(sleep));
format!(
"current thread {:?} | thread name {} | request_duration {:?}",
std::thread::current().id(),
std::thread::current()
.name()
.get_or_insert("default_thread_name"),
sleep
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment