Last active
May 30, 2023 19:26
-
-
Save esemeniuc/2a555c324903410bc3323bb34749e834 to your computer and use it in GitHub Desktop.
Read contention Benchmark for Tokio async vs arc_swap vs RwLock vs parking_lot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* results on 24 threads, Linux, Ryzen 5900X, 64GB RAM, rust 1.66 release mode | |
mutex st: 3.504602ms | |
mutex mt 24 threads: 966.566131ms | |
rwlock st: 4.115984ms | |
rwlock mt 24 threads: 1.639819526s | |
parking_lot st: 4.255349ms | |
parking_lot mt 24 threads: 1.336455132s | |
try channel st: 18.446771ms | |
try channel mt 24 threads: 3.152402156s | |
new channel st: 45.410769ms | |
new channel mt 24 threads: 3.625760775s | |
arcswap st: 3.785133ms | |
arcswap mt 24 threads: 12.61671ms | |
arcswap st fallback: 7.304135ms | |
arcswap mt fallback 24 threads: 791.850477ms | |
tokio st: 24.056378ms | |
tokio mt 24 threads: 18.930533683s | |
arc-swap = "1.6.0" | |
futures-util = "0.3.28" | |
parking_lot = "0.12.1" | |
tokio = { version = "~1.14.1", features = ["rt-multi-thread", "macros", "sync", "time", "full", "rt"] } | |
crossbeam-channel = "0.5.8" | |
*/ | |
use std::sync::{Arc, Mutex, RwLock}; | |
use std::thread; | |
use std::thread::JoinHandle; | |
use std::time::Instant; | |
use arc_swap::ArcSwap; | |
use futures_util::future::join_all; | |
#[tokio::main] | |
async fn main() { | |
benchmark_mutex(); | |
benchmark_rwlock(); | |
benchmark_parking_lot(); | |
benchmark_try_channel(); | |
benchmark_new_channel(); | |
benchmark_atomic(); | |
benchmark_atomic_fallback(); | |
benchmark_tokio().await; | |
} | |
fn benchmark_mutex() { | |
//single threaded | |
let mut v = vec![]; | |
(0..100i32).into_iter().for_each(|i| v.push(i.to_string())); | |
let storage = Arc::new(Mutex::new(v)); | |
let mut sum = 0; | |
let now = Instant::now(); | |
// =================================== | |
for i in 0..1_000_000 { | |
let arr = storage.lock().unwrap(); | |
sum += arr[i % 90].len(); | |
} | |
// =================================== | |
println!("mutex st: {:?}", now.elapsed()); | |
//multithreaded case | |
let threads = usize::from(std::thread::available_parallelism().unwrap()); | |
let now = Instant::now(); | |
let handles = (0..threads) | |
.map(|_t_id| { | |
let storage = storage.clone(); | |
let mut sum = 0; | |
thread::spawn(move || { | |
for i in 0..1_000_000 { | |
let arr = storage.lock().unwrap(); | |
sum += arr[i % 90].len(); | |
} | |
sum | |
}) | |
}) | |
.collect::<Vec<JoinHandle<_>>>(); | |
let results = handles | |
.into_iter() | |
.map(|handle| handle.join().unwrap()) | |
.collect::<Vec<_>>(); | |
assert!(results.into_iter().all(|x| x == sum)); | |
println!("mutex mt {threads} threads: {:?}", now.elapsed()); | |
} | |
fn benchmark_rwlock() { | |
//single threaded | |
let mut v = vec![]; | |
(0..100i32).into_iter().for_each(|i| v.push(i.to_string())); | |
let storage = Arc::new(RwLock::new(v)); | |
let mut sum = 0; | |
let now = Instant::now(); | |
// =================================== | |
for i in 0..1_000_000 { | |
let arr = storage.read().unwrap(); | |
sum += arr[i % 90].len(); | |
} | |
// =================================== | |
println!("rwlock st: {:?}", now.elapsed()); | |
//multithreaded case | |
let threads = usize::from(std::thread::available_parallelism().unwrap()); | |
let now = Instant::now(); | |
let handles = (0..threads) | |
.map(|_t_id| { | |
let storage = storage.clone(); | |
let mut sum = 0; | |
thread::spawn(move || { | |
for i in 0..1_000_000 { | |
let arr = storage.read().unwrap(); | |
sum += arr[i % 90].len(); | |
} | |
sum | |
}) | |
}) | |
.collect::<Vec<JoinHandle<_>>>(); | |
let results = handles | |
.into_iter() | |
.map(|handle| handle.join().unwrap()) | |
.collect::<Vec<_>>(); | |
assert!(results.into_iter().all(|x| x == sum)); | |
println!("rwlock mt {threads} threads: {:?}", now.elapsed()); | |
} | |
fn benchmark_parking_lot() { | |
//single threaded | |
let mut v = vec![]; | |
(0..100i32).into_iter().for_each(|i| v.push(i.to_string())); | |
let storage = Arc::new(parking_lot::RwLock::new(v)); | |
let mut sum = 0; | |
let now = Instant::now(); | |
// =================================== | |
for i in 0..1_000_000 { | |
let arr = storage.read(); | |
sum += arr[i % 90].len(); | |
} | |
// =================================== | |
println!("parking_lot st: {:?}", now.elapsed()); | |
//multithreaded case | |
let threads = usize::from(std::thread::available_parallelism().unwrap()); | |
let now = Instant::now(); | |
let handles = (0..threads) | |
.map(|_t_id| { | |
let storage = storage.clone(); | |
let mut sum = 0; | |
thread::spawn(move || { | |
for i in 0..1_000_000 { | |
let arr = storage.read(); | |
sum += arr[i % 90].len(); | |
} | |
sum | |
}) | |
}) | |
.collect::<Vec<JoinHandle<_>>>(); | |
let results = handles | |
.into_iter() | |
.map(|handle| handle.join().unwrap()) | |
.collect::<Vec<_>>(); | |
assert!(results.into_iter().all(|x| x == sum)); | |
println!("parking_lot mt {threads} threads: {:?}", now.elapsed()); | |
} | |
fn benchmark_try_channel() { | |
let (s, r) = crossbeam_channel::unbounded(); | |
//single threaded | |
(0..25_000_000).for_each(|i| s.send(i.to_string()).unwrap()); | |
// let storage = Arc::new(parking_lot::RwLock::new(v)); | |
let mut sum = 0; | |
let now = Instant::now(); | |
// =================================== | |
for i in 0..1_000_000 { | |
let result = r.try_recv().unwrap(); | |
sum += result.len(); | |
} | |
// =================================== | |
println!("try channel st: {:?}", now.elapsed()); | |
//multithreaded case | |
let threads = usize::from(std::thread::available_parallelism().unwrap()); | |
let now = Instant::now(); | |
let handles = (0..threads) | |
.map(|_t_id| { | |
let mut sum = 0; | |
let r = r.clone(); | |
thread::spawn(move || { | |
for i in 0..1_000_000 { | |
let result = r.try_recv().unwrap(); | |
sum += result.len(); | |
} | |
sum | |
}) | |
}) | |
.collect::<Vec<JoinHandle<_>>>(); | |
let results = handles | |
.into_iter() | |
.map(|handle| handle.join().unwrap()) | |
.collect::<Vec<_>>(); | |
assert!(results.into_iter().all(|x| x > 0)); | |
println!("try channel mt {threads} threads: {:?}", now.elapsed()); | |
} | |
fn benchmark_new_channel() { | |
let (s, r) = crossbeam_channel::unbounded(); | |
//single threaded | |
(0..25_000_000).for_each(|i| s.send(i.to_string()).unwrap()); | |
// let storage = Arc::new(parking_lot::RwLock::new(v)); | |
let mut sum = 0; | |
let now = Instant::now(); | |
// =================================== | |
for i in 0..1_000_000 { | |
let result = r.recv().unwrap(); | |
sum += result.len(); | |
} | |
// =================================== | |
println!("new channel st: {:?}", now.elapsed()); | |
//multithreaded case | |
let threads = usize::from(std::thread::available_parallelism().unwrap()); | |
let now = Instant::now(); | |
let handles = (0..threads) | |
.map(|_t_id| { | |
let mut sum = 0; | |
let r = r.clone(); | |
thread::spawn(move || { | |
for i in 0..1_000_000 { | |
let result = r.recv().unwrap(); | |
sum += result.len(); | |
} | |
sum | |
}) | |
}) | |
.collect::<Vec<JoinHandle<_>>>(); | |
let results = handles | |
.into_iter() | |
.map(|handle| handle.join().unwrap()) | |
.collect::<Vec<_>>(); | |
assert!(results.into_iter().all(|x| x > 0)); | |
println!("new channel mt {threads} threads: {:?}", now.elapsed()); | |
} | |
fn benchmark_atomic() { | |
// single threaded | |
let mut v = vec![]; | |
(0..100i32).into_iter().for_each(|i| v.push(i.to_string())); | |
let storage = Arc::new(ArcSwap::from_pointee(v)); | |
let mut sum = 0; | |
let now = Instant::now(); | |
// =================================== | |
for i in 0..1_000_000 { | |
let arr = storage.load(); | |
sum += arr[i % 90].len(); | |
} | |
// =================================== | |
println!("arcswap st: {:?}", now.elapsed()); | |
//multithreaded case | |
let threads = usize::from(std::thread::available_parallelism().unwrap()); | |
let now = Instant::now(); | |
let handles = (0..threads) | |
.map(|_t_id| { | |
let storage = storage.clone(); | |
let mut sum = 0; | |
thread::spawn(move || { | |
for i in 0..1_000_000 { | |
let arr = storage.load(); | |
sum += arr[i % 90].len(); | |
} | |
sum | |
}) | |
}) | |
.collect::<Vec<JoinHandle<_>>>(); | |
let results = handles | |
.into_iter() | |
.map(|handle| handle.join().unwrap()) | |
.collect::<Vec<_>>(); | |
assert!(results.into_iter().all(|x| x == sum)); | |
println!("arcswap mt {threads} threads: {:?}", now.elapsed()); | |
} | |
fn benchmark_atomic_fallback() { | |
// single threaded | |
let mut v = vec![]; | |
(0..100i32).into_iter().for_each(|i| v.push(i.to_string())); | |
let storage = Arc::new(ArcSwap::from_pointee(v)); | |
let mut sum = 0; | |
let now = Instant::now(); | |
// =================================== | |
for i in 0..1_000_000 { | |
let arr = storage.load_full(); | |
sum += arr[i % 90].len(); | |
} | |
// =================================== | |
println!("arcswap st fallback: {:?}", now.elapsed()); | |
//multithreaded case | |
let threads = usize::from(std::thread::available_parallelism().unwrap()); | |
let now = Instant::now(); | |
let handles = (0..threads) | |
.map(|_t_id| { | |
let storage = storage.clone(); | |
let mut sum = 0; | |
thread::spawn(move || { | |
for i in 0..1_000_000 { | |
let arr = storage.load_full(); | |
sum += arr[i % 90].len(); | |
} | |
sum | |
}) | |
}) | |
.collect::<Vec<JoinHandle<_>>>(); | |
let results = handles | |
.into_iter() | |
.map(|handle| handle.join().unwrap()) | |
.collect::<Vec<_>>(); | |
assert!(results.into_iter().all(|x| x == sum)); | |
println!("arcswap mt fallback {threads} threads: {:?}", now.elapsed()); | |
} | |
async fn benchmark_tokio() { | |
//single threaded | |
let mut v = vec![]; | |
(0..100i32).into_iter().for_each(|i| v.push(i.to_string())); | |
let storage = Arc::new(tokio::sync::RwLock::new(v)); | |
let mut sum = 0; | |
let now = Instant::now(); | |
// =================================== | |
for i in 0..1_000_000 { | |
let arr = storage.read().await; | |
sum += arr[i % 90].len(); | |
} | |
// =================================== | |
println!("tokio st: {:?}", now.elapsed()); | |
//multithreaded case | |
let threads = usize::from(std::thread::available_parallelism().unwrap()); | |
let now = Instant::now(); | |
let handles = (0..threads) | |
.map(|_t_id| { | |
let storage = storage.clone(); | |
let mut sum = 0; | |
tokio::spawn(async move { | |
for i in 0..1_000_000 { | |
let arr = storage.read().await; | |
sum += arr[i % 90].len(); | |
} | |
sum | |
}) | |
}) | |
.collect::<Vec<_>>(); | |
let results = join_all(handles).await; | |
assert!(results.into_iter().map(|x| x.unwrap()).all(|x| x == sum)); | |
println!("tokio mt {threads} threads: {:?}", now.elapsed()); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment