Skip to content

Instantly share code, notes, and snippets.

@mooreniemi
Last active May 10, 2024 02:54
Show Gist options
  • Save mooreniemi/4c7dc474a38d8a2055af3381ac3f8992 to your computer and use it in GitHub Desktop.
Save mooreniemi/4c7dc474a38d8a2055af3381ac3f8992 to your computer and use it in GitHub Desktop.
[package]
name = "poodah"
version = "0.1.0"
edition = "2021"
[[bench]]
name = "ray_in_tok"
harness = false
[dependencies]
arrow = "25.0.0"
parquet = "25.0.0"
rusoto_core = "0.48.0"
rusoto_mock = "0.48.0"
rusoto_s3 = "0.48.0"
tokio = { version = "1.21.2", features = ["full"] }
mockall = "*"
bytes = "*"
url = "*"
rayon = "*"
tokio-metrics = { version = "0.3", features = ["rt"] }
[dev-dependencies]
criterion = { version = "0.5", features = ["async_tokio", "async_futures"] }
parameterized = "*"
[profile.release]
codegen-units = 1
lto = true
opt-level = 3
#![allow(unused)]
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use tokio::runtime::Runtime;
// NOTE: wrote this in a workspace so
// open ../target/criterion/CPU\ Task\ Size\ 100000000/report/index.html
// A CPU-intensive task
async fn cpu_intensive_task(size: usize) -> i32 {
black_box((0..size).fold(0, |acc, x| acc + (x * x) as i32))
}
// A CPU-intensive task using Tokio's spawn_blocking
async fn cpu_intensive_task_spawn_blocking(size: usize) -> i32 {
tokio::task::spawn_blocking(move || black_box((0..size).fold(0, |acc, x| acc + (x * x) as i32)))
.await
.unwrap()
}
// A CPU-intensive task using Rayon inside Tokio's spawn_blocking
async fn cpu_intensive_task_with_rayon_in_spawn_blocking(size: usize) -> i32 {
tokio::task::spawn_blocking(move || {
let data: Vec<usize> = black_box((0..size).collect());
black_box(data.par_iter().map(|&x| (x * x) as i32).sum())
})
.await
.unwrap()
}
// A CPU-intensive task using Rayon alone
async fn cpu_intensive_task_with_rayon_only(size: usize) -> i32 {
let data: Vec<usize> = black_box((0..size).collect());
black_box(data.par_iter().map(|&x| (x * x) as i32).sum())
}
// Example of using a custom thread pool and batching
async fn cpu_intensive_task_with_rayon_thread_pool(size: usize) -> i32 {
let pool = ThreadPoolBuilder::new().num_threads(4).build().unwrap(); // Customize the number of threads
pool.install(|| {
let data: Vec<usize> = black_box((0..size).collect());
black_box(data.par_chunks(1024) // Adjust the chunk size based on your task's granularity
.map(|chunk| chunk.iter().map(|&x| black_box(x * x) as i32).sum::<i32>())
.sum())
})
}
fn cpu_tasks(c: &mut Criterion) {
let sizes = vec![1024, 100_000, 1_000_000, 10_000_000, 100_000_000];
let rt = Runtime::new().unwrap();
for &size in &sizes {
let mut group = c.benchmark_group(format!("CPU Task Size {}", size));
group.bench_function(BenchmarkId::new("Direct", size), |b| {
b.to_async(&rt).iter(|| cpu_intensive_task(size));
});
group.bench_function(BenchmarkId::new("Tokio spawn_blocking", size), |b| {
b.to_async(&rt)
.iter(|| cpu_intensive_task_spawn_blocking(size));
});
group.bench_function(
BenchmarkId::new("Rayon (par_iter) in Tokio spawn_blocking", size),
|b| {
b.to_async(&rt)
.iter(|| cpu_intensive_task_with_rayon_in_spawn_blocking(size));
},
);
group.bench_function(BenchmarkId::new("Rayon Only (par_iter)", size), |b| {
b.to_async(&rt)
.iter(|| cpu_intensive_task_with_rayon_only(size));
});
group.bench_function(BenchmarkId::new("Rayon Only (thread pool)", size), |b| {
b.to_async(&rt)
.iter(|| cpu_intensive_task_with_rayon_thread_pool(size));
});
group.finish();
}
}
criterion_group!(benches, cpu_tasks);
criterion_main!(benches);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment