Skip to content

Instantly share code, notes, and snippets.

@esemeniuc
Last active May 30, 2023 19:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save esemeniuc/2a555c324903410bc3323bb34749e834 to your computer and use it in GitHub Desktop.
Save esemeniuc/2a555c324903410bc3323bb34749e834 to your computer and use it in GitHub Desktop.
Read contention Benchmark for Tokio async vs arc_swap vs RwLock vs parking_lot
/* results on 24 threads, Linux, Ryzen 5900X, 64GB RAM, rust 1.66 release mode
mutex st: 3.504602ms
mutex mt 24 threads: 966.566131ms
rwlock st: 4.115984ms
rwlock mt 24 threads: 1.639819526s
parking_lot st: 4.255349ms
parking_lot mt 24 threads: 1.336455132s
try channel st: 18.446771ms
try channel mt 24 threads: 3.152402156s
new channel st: 45.410769ms
new channel mt 24 threads: 3.625760775s
arcswap st: 3.785133ms
arcswap mt 24 threads: 12.61671ms
arcswap st fallback: 7.304135ms
arcswap mt fallback 24 threads: 791.850477ms
tokio st: 24.056378ms
tokio mt 24 threads: 18.930533683s
arc-swap = "1.6.0"
futures-util = "0.3.28"
parking_lot = "0.12.1"
tokio = { version = "~1.14.1", features = ["rt-multi-thread", "macros", "sync", "time", "full", "rt"] }
crossbeam-channel = "0.5.8"
*/
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::thread::JoinHandle;
use std::time::Instant;
use arc_swap::ArcSwap;
use futures_util::future::join_all;
#[tokio::main]
async fn main() {
benchmark_mutex();
benchmark_rwlock();
benchmark_parking_lot();
benchmark_try_channel();
benchmark_new_channel();
benchmark_atomic();
benchmark_atomic_fallback();
benchmark_tokio().await;
}
fn benchmark_mutex() {
//single threaded
let mut v = vec![];
(0..100i32).into_iter().for_each(|i| v.push(i.to_string()));
let storage = Arc::new(Mutex::new(v));
let mut sum = 0;
let now = Instant::now();
// ===================================
for i in 0..1_000_000 {
let arr = storage.lock().unwrap();
sum += arr[i % 90].len();
}
// ===================================
println!("mutex st: {:?}", now.elapsed());
//multithreaded case
let threads = usize::from(std::thread::available_parallelism().unwrap());
let now = Instant::now();
let handles = (0..threads)
.map(|_t_id| {
let storage = storage.clone();
let mut sum = 0;
thread::spawn(move || {
for i in 0..1_000_000 {
let arr = storage.lock().unwrap();
sum += arr[i % 90].len();
}
sum
})
})
.collect::<Vec<JoinHandle<_>>>();
let results = handles
.into_iter()
.map(|handle| handle.join().unwrap())
.collect::<Vec<_>>();
assert!(results.into_iter().all(|x| x == sum));
println!("mutex mt {threads} threads: {:?}", now.elapsed());
}
fn benchmark_rwlock() {
//single threaded
let mut v = vec![];
(0..100i32).into_iter().for_each(|i| v.push(i.to_string()));
let storage = Arc::new(RwLock::new(v));
let mut sum = 0;
let now = Instant::now();
// ===================================
for i in 0..1_000_000 {
let arr = storage.read().unwrap();
sum += arr[i % 90].len();
}
// ===================================
println!("rwlock st: {:?}", now.elapsed());
//multithreaded case
let threads = usize::from(std::thread::available_parallelism().unwrap());
let now = Instant::now();
let handles = (0..threads)
.map(|_t_id| {
let storage = storage.clone();
let mut sum = 0;
thread::spawn(move || {
for i in 0..1_000_000 {
let arr = storage.read().unwrap();
sum += arr[i % 90].len();
}
sum
})
})
.collect::<Vec<JoinHandle<_>>>();
let results = handles
.into_iter()
.map(|handle| handle.join().unwrap())
.collect::<Vec<_>>();
assert!(results.into_iter().all(|x| x == sum));
println!("rwlock mt {threads} threads: {:?}", now.elapsed());
}
fn benchmark_parking_lot() {
//single threaded
let mut v = vec![];
(0..100i32).into_iter().for_each(|i| v.push(i.to_string()));
let storage = Arc::new(parking_lot::RwLock::new(v));
let mut sum = 0;
let now = Instant::now();
// ===================================
for i in 0..1_000_000 {
let arr = storage.read();
sum += arr[i % 90].len();
}
// ===================================
println!("parking_lot st: {:?}", now.elapsed());
//multithreaded case
let threads = usize::from(std::thread::available_parallelism().unwrap());
let now = Instant::now();
let handles = (0..threads)
.map(|_t_id| {
let storage = storage.clone();
let mut sum = 0;
thread::spawn(move || {
for i in 0..1_000_000 {
let arr = storage.read();
sum += arr[i % 90].len();
}
sum
})
})
.collect::<Vec<JoinHandle<_>>>();
let results = handles
.into_iter()
.map(|handle| handle.join().unwrap())
.collect::<Vec<_>>();
assert!(results.into_iter().all(|x| x == sum));
println!("parking_lot mt {threads} threads: {:?}", now.elapsed());
}
fn benchmark_try_channel() {
let (s, r) = crossbeam_channel::unbounded();
//single threaded
(0..25_000_000).for_each(|i| s.send(i.to_string()).unwrap());
// let storage = Arc::new(parking_lot::RwLock::new(v));
let mut sum = 0;
let now = Instant::now();
// ===================================
for i in 0..1_000_000 {
let result = r.try_recv().unwrap();
sum += result.len();
}
// ===================================
println!("try channel st: {:?}", now.elapsed());
//multithreaded case
let threads = usize::from(std::thread::available_parallelism().unwrap());
let now = Instant::now();
let handles = (0..threads)
.map(|_t_id| {
let mut sum = 0;
let r = r.clone();
thread::spawn(move || {
for i in 0..1_000_000 {
let result = r.try_recv().unwrap();
sum += result.len();
}
sum
})
})
.collect::<Vec<JoinHandle<_>>>();
let results = handles
.into_iter()
.map(|handle| handle.join().unwrap())
.collect::<Vec<_>>();
assert!(results.into_iter().all(|x| x > 0));
println!("try channel mt {threads} threads: {:?}", now.elapsed());
}
fn benchmark_new_channel() {
let (s, r) = crossbeam_channel::unbounded();
//single threaded
(0..25_000_000).for_each(|i| s.send(i.to_string()).unwrap());
// let storage = Arc::new(parking_lot::RwLock::new(v));
let mut sum = 0;
let now = Instant::now();
// ===================================
for i in 0..1_000_000 {
let result = r.recv().unwrap();
sum += result.len();
}
// ===================================
println!("new channel st: {:?}", now.elapsed());
//multithreaded case
let threads = usize::from(std::thread::available_parallelism().unwrap());
let now = Instant::now();
let handles = (0..threads)
.map(|_t_id| {
let mut sum = 0;
let r = r.clone();
thread::spawn(move || {
for i in 0..1_000_000 {
let result = r.recv().unwrap();
sum += result.len();
}
sum
})
})
.collect::<Vec<JoinHandle<_>>>();
let results = handles
.into_iter()
.map(|handle| handle.join().unwrap())
.collect::<Vec<_>>();
assert!(results.into_iter().all(|x| x > 0));
println!("new channel mt {threads} threads: {:?}", now.elapsed());
}
fn benchmark_atomic() {
// single threaded
let mut v = vec![];
(0..100i32).into_iter().for_each(|i| v.push(i.to_string()));
let storage = Arc::new(ArcSwap::from_pointee(v));
let mut sum = 0;
let now = Instant::now();
// ===================================
for i in 0..1_000_000 {
let arr = storage.load();
sum += arr[i % 90].len();
}
// ===================================
println!("arcswap st: {:?}", now.elapsed());
//multithreaded case
let threads = usize::from(std::thread::available_parallelism().unwrap());
let now = Instant::now();
let handles = (0..threads)
.map(|_t_id| {
let storage = storage.clone();
let mut sum = 0;
thread::spawn(move || {
for i in 0..1_000_000 {
let arr = storage.load();
sum += arr[i % 90].len();
}
sum
})
})
.collect::<Vec<JoinHandle<_>>>();
let results = handles
.into_iter()
.map(|handle| handle.join().unwrap())
.collect::<Vec<_>>();
assert!(results.into_iter().all(|x| x == sum));
println!("arcswap mt {threads} threads: {:?}", now.elapsed());
}
fn benchmark_atomic_fallback() {
// single threaded
let mut v = vec![];
(0..100i32).into_iter().for_each(|i| v.push(i.to_string()));
let storage = Arc::new(ArcSwap::from_pointee(v));
let mut sum = 0;
let now = Instant::now();
// ===================================
for i in 0..1_000_000 {
let arr = storage.load_full();
sum += arr[i % 90].len();
}
// ===================================
println!("arcswap st fallback: {:?}", now.elapsed());
//multithreaded case
let threads = usize::from(std::thread::available_parallelism().unwrap());
let now = Instant::now();
let handles = (0..threads)
.map(|_t_id| {
let storage = storage.clone();
let mut sum = 0;
thread::spawn(move || {
for i in 0..1_000_000 {
let arr = storage.load_full();
sum += arr[i % 90].len();
}
sum
})
})
.collect::<Vec<JoinHandle<_>>>();
let results = handles
.into_iter()
.map(|handle| handle.join().unwrap())
.collect::<Vec<_>>();
assert!(results.into_iter().all(|x| x == sum));
println!("arcswap mt fallback {threads} threads: {:?}", now.elapsed());
}
async fn benchmark_tokio() {
//single threaded
let mut v = vec![];
(0..100i32).into_iter().for_each(|i| v.push(i.to_string()));
let storage = Arc::new(tokio::sync::RwLock::new(v));
let mut sum = 0;
let now = Instant::now();
// ===================================
for i in 0..1_000_000 {
let arr = storage.read().await;
sum += arr[i % 90].len();
}
// ===================================
println!("tokio st: {:?}", now.elapsed());
//multithreaded case
let threads = usize::from(std::thread::available_parallelism().unwrap());
let now = Instant::now();
let handles = (0..threads)
.map(|_t_id| {
let storage = storage.clone();
let mut sum = 0;
tokio::spawn(async move {
for i in 0..1_000_000 {
let arr = storage.read().await;
sum += arr[i % 90].len();
}
sum
})
})
.collect::<Vec<_>>();
let results = join_all(handles).await;
assert!(results.into_iter().map(|x| x.unwrap()).all(|x| x == sum));
println!("tokio mt {threads} threads: {:?}", now.elapsed());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment